Repository: spark
Updated Branches:
  refs/heads/master 7c7c7529a -> c8677d736


[SPARK-9958] [SQL] Make HiveThriftServer2Listener thread-safe and update the 
tab name to "JDBC/ODBC Server"

This PR fixed the thread-safe issue of HiveThriftServer2Listener, and also 
changed the tab name to "JDBC/ODBC Server" since it's conflict with the new SQL 
tab.

<img width="1377" alt="thriftserver" 
src="https://cloud.githubusercontent.com/assets/1000778/9265707/c46f3f2c-4269-11e5-8d7e-888c9113ab4f.png";>

Author: zsxwing <[email protected]>

Closes #8185 from zsxwing/SPARK-9958.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8677d73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8677d73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8677d73

Branch: refs/heads/master
Commit: c8677d73666850b37ff937520e538650632ce304
Parents: 7c7c752
Author: zsxwing <[email protected]>
Authored: Fri Aug 14 14:41:53 2015 +0800
Committer: Cheng Lian <[email protected]>
Committed: Fri Aug 14 14:41:53 2015 +0800

----------------------------------------------------------------------
 .../hive/thriftserver/HiveThriftServer2.scala   | 64 ++++++++++++--------
 .../hive/thriftserver/ui/ThriftServerPage.scala | 32 +++++-----
 .../ui/ThriftServerSessionPage.scala            | 38 ++++++------
 .../hive/thriftserver/ui/ThriftServerTab.scala  |  4 +-
 4 files changed, 78 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c8677d73/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 2c9fa59..dd9fef9 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -152,16 +152,26 @@ object HiveThriftServer2 extends Logging {
     override def onApplicationEnd(applicationEnd: 
SparkListenerApplicationEnd): Unit = {
       server.stop()
     }
-    var onlineSessionNum: Int = 0
-    val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
-    val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
-    val retainedStatements =
-      conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
-    val retainedSessions =
-      conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
-    var totalRunning = 0
-
-    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+    private var onlineSessionNum: Int = 0
+    private val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
+    private val executionList = new mutable.LinkedHashMap[String, 
ExecutionInfo]
+    private val retainedStatements = 
conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
+    private val retainedSessions = 
conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
+    private var totalRunning = 0
+
+    def getOnlineSessionNum: Int = synchronized { onlineSessionNum }
+
+    def getTotalRunning: Int = synchronized { totalRunning }
+
+    def getSessionList: Seq[SessionInfo] = synchronized { 
sessionList.values.toSeq }
+
+    def getSession(sessionId: String): Option[SessionInfo] = synchronized {
+      sessionList.get(sessionId)
+    }
+
+    def getExecutionList: Seq[ExecutionInfo] = synchronized { 
executionList.values.toSeq }
+
+    override def onJobStart(jobStart: SparkListenerJobStart): Unit = 
synchronized {
       for {
         props <- Option(jobStart.properties)
         groupId <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
@@ -173,13 +183,15 @@ object HiveThriftServer2 extends Logging {
     }
 
     def onSessionCreated(ip: String, sessionId: String, userName: String = 
"UNKNOWN"): Unit = {
-      val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, 
userName)
-      sessionList.put(sessionId, info)
-      onlineSessionNum += 1
-      trimSessionIfNecessary()
+      synchronized {
+        val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, 
userName)
+        sessionList.put(sessionId, info)
+        onlineSessionNum += 1
+        trimSessionIfNecessary()
+      }
     }
 
-    def onSessionClosed(sessionId: String): Unit = {
+    def onSessionClosed(sessionId: String): Unit = synchronized {
       sessionList(sessionId).finishTimestamp = System.currentTimeMillis
       onlineSessionNum -= 1
       trimSessionIfNecessary()
@@ -190,7 +202,7 @@ object HiveThriftServer2 extends Logging {
         sessionId: String,
         statement: String,
         groupId: String,
-        userName: String = "UNKNOWN"): Unit = {
+        userName: String = "UNKNOWN"): Unit = synchronized {
       val info = new ExecutionInfo(statement, sessionId, 
System.currentTimeMillis, userName)
       info.state = ExecutionState.STARTED
       executionList.put(id, info)
@@ -200,27 +212,29 @@ object HiveThriftServer2 extends Logging {
       totalRunning += 1
     }
 
-    def onStatementParsed(id: String, executionPlan: String): Unit = {
+    def onStatementParsed(id: String, executionPlan: String): Unit = 
synchronized {
       executionList(id).executePlan = executionPlan
       executionList(id).state = ExecutionState.COMPILED
     }
 
     def onStatementError(id: String, errorMessage: String, errorTrace: 
String): Unit = {
-      executionList(id).finishTimestamp = System.currentTimeMillis
-      executionList(id).detail = errorMessage
-      executionList(id).state = ExecutionState.FAILED
-      totalRunning -= 1
-      trimExecutionIfNecessary()
+      synchronized {
+        executionList(id).finishTimestamp = System.currentTimeMillis
+        executionList(id).detail = errorMessage
+        executionList(id).state = ExecutionState.FAILED
+        totalRunning -= 1
+        trimExecutionIfNecessary()
+      }
     }
 
-    def onStatementFinish(id: String): Unit = {
+    def onStatementFinish(id: String): Unit = synchronized {
       executionList(id).finishTimestamp = System.currentTimeMillis
       executionList(id).state = ExecutionState.FINISHED
       totalRunning -= 1
       trimExecutionIfNecessary()
     }
 
-    private def trimExecutionIfNecessary() = synchronized {
+    private def trimExecutionIfNecessary() = {
       if (executionList.size > retainedStatements) {
         val toRemove = math.max(retainedStatements / 10, 1)
         executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach 
{ s =>
@@ -229,7 +243,7 @@ object HiveThriftServer2 extends Logging {
       }
     }
 
-    private def trimSessionIfNecessary() = synchronized {
+    private def trimSessionIfNecessary() = {
       if (sessionList.size > retainedSessions) {
         val toRemove = math.max(retainedSessions / 10, 1)
         sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { 
s =>

http://git-wip-us.apache.org/repos/asf/spark/blob/c8677d73/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
index 10c83d8..e990bd0 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
@@ -39,14 +39,16 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) 
extends WebUIPage(""
   /** Render the page */
   def render(request: HttpServletRequest): Seq[Node] = {
     val content =
-      generateBasicStats() ++
-      <br/> ++
-      <h4>
-        {listener.onlineSessionNum} session(s) are online,
-        running {listener.totalRunning} SQL statement(s)
-      </h4> ++
-      generateSessionStatsTable() ++
-      generateSQLStatsTable()
+      listener.synchronized { // make sure all parts in this page are 
consistent
+        generateBasicStats() ++
+        <br/> ++
+        <h4>
+        {listener.getOnlineSessionNum} session(s) are online,
+        running {listener.getTotalRunning} SQL statement(s)
+        </h4> ++
+        generateSessionStatsTable() ++
+        generateSQLStatsTable()
+      }
     UIUtils.headerSparkPage("JDBC/ODBC Server", content, parent, Some(5000))
   }
 
@@ -65,11 +67,11 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) 
extends WebUIPage(""
 
   /** Generate stats of batch statements of the thrift server program */
   private def generateSQLStatsTable(): Seq[Node] = {
-    val numStatement = listener.executionList.size
+    val numStatement = listener.getExecutionList.size
     val table = if (numStatement > 0) {
       val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish 
Time", "Duration",
         "Statement", "State", "Detail")
-      val dataRows = listener.executionList.values
+      val dataRows = listener.getExecutionList
 
       def generateDataRow(info: ExecutionInfo): Seq[Node] = {
         val jobLink = info.jobId.map { id: String =>
@@ -136,15 +138,15 @@ private[ui] class ThriftServerPage(parent: 
ThriftServerTab) extends WebUIPage(""
 
   /** Generate stats of batch sessions of the thrift server program */
   private def generateSessionStatsTable(): Seq[Node] = {
-    val numBatches = listener.sessionList.size
+    val sessionList = listener.getSessionList
+    val numBatches = sessionList.size
     val table = if (numBatches > 0) {
-      val dataRows =
-        listener.sessionList.values
+      val dataRows = sessionList
       val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish 
Time", "Duration",
         "Total Execute")
       def generateDataRow(session: SessionInfo): Seq[Node] = {
-        val sessionLink = "%s/sql/session?id=%s"
-          .format(UIUtils.prependBaseUri(parent.basePath), session.sessionId)
+        val sessionLink = "%s/%s/session?id=%s"
+          .format(UIUtils.prependBaseUri(parent.basePath), parent.prefix, 
session.sessionId)
         <tr>
           <td> {session.userName} </td>
           <td> {session.ip} </td>

http://git-wip-us.apache.org/repos/asf/spark/blob/c8677d73/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
index 3b01afa..af16cb3 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
@@ -40,21 +40,22 @@ private[ui] class ThriftServerSessionPage(parent: 
ThriftServerTab)
   def render(request: HttpServletRequest): Seq[Node] = {
     val parameterId = request.getParameter("id")
     require(parameterId != null && parameterId.nonEmpty, "Missing id 
parameter")
-    val sessionStat = listener.sessionList.find(stat => {
-      stat._1 == parameterId
-    }).getOrElse(null)
-    require(sessionStat != null, "Invalid sessionID[" + parameterId + "]")
 
     val content =
-      generateBasicStats() ++
-      <br/> ++
-      <h4>
-        User {sessionStat._2.userName},
-        IP {sessionStat._2.ip},
-        Session created at {formatDate(sessionStat._2.startTimestamp)},
-        Total run {sessionStat._2.totalExecution} SQL
-      </h4> ++
-      generateSQLStatsTable(sessionStat._2.sessionId)
+      listener.synchronized { // make sure all parts in this page are 
consistent
+        val sessionStat = listener.getSession(parameterId).getOrElse(null)
+        require(sessionStat != null, "Invalid sessionID[" + parameterId + "]")
+
+        generateBasicStats() ++
+        <br/> ++
+        <h4>
+        User {sessionStat.userName},
+        IP {sessionStat.ip},
+        Session created at {formatDate(sessionStat.startTimestamp)},
+        Total run {sessionStat.totalExecution} SQL
+        </h4> ++
+        generateSQLStatsTable(sessionStat.sessionId)
+      }
     UIUtils.headerSparkPage("JDBC/ODBC Session", content, parent, Some(5000))
   }
 
@@ -73,13 +74,13 @@ private[ui] class ThriftServerSessionPage(parent: 
ThriftServerTab)
 
   /** Generate stats of batch statements of the thrift server program */
   private def generateSQLStatsTable(sessionID: String): Seq[Node] = {
-    val executionList = listener.executionList
-      .filter(_._2.sessionId == sessionID)
+    val executionList = listener.getExecutionList
+      .filter(_.sessionId == sessionID)
     val numStatement = executionList.size
     val table = if (numStatement > 0) {
       val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish 
Time", "Duration",
         "Statement", "State", "Detail")
-      val dataRows = 
executionList.values.toSeq.sortBy(_.startTimestamp).reverse
+      val dataRows = executionList.sortBy(_.startTimestamp).reverse
 
       def generateDataRow(info: ExecutionInfo): Seq[Node] = {
         val jobLink = info.jobId.map { id: String =>
@@ -146,10 +147,11 @@ private[ui] class ThriftServerSessionPage(parent: 
ThriftServerTab)
 
   /** Generate stats of batch sessions of the thrift server program */
   private def generateSessionStatsTable(): Seq[Node] = {
-    val numBatches = listener.sessionList.size
+    val sessionList = listener.getSessionList
+    val numBatches = sessionList.size
     val table = if (numBatches > 0) {
       val dataRows =
-        listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map 
( session =>
+        sessionList.sortBy(_.startTimestamp).reverse.map ( session =>
         Seq(
           session.userName,
           session.ip,

http://git-wip-us.apache.org/repos/asf/spark/blob/c8677d73/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
index 94fd8a6..4eabeaa 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala
@@ -27,9 +27,9 @@ import org.apache.spark.{SparkContext, Logging, 
SparkException}
  * This assumes the given SparkContext has enabled its SparkUI.
  */
 private[thriftserver] class ThriftServerTab(sparkContext: SparkContext)
-  extends SparkUITab(getSparkUI(sparkContext), "sql") with Logging {
+  extends SparkUITab(getSparkUI(sparkContext), "sqlserver") with Logging {
 
-  override val name = "SQL"
+  override val name = "JDBC/ODBC Server"
 
   val parent = getSparkUI(sparkContext)
   val listener = HiveThriftServer2.listener


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to