This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 687dd4e  [SPARK-28260][SQL] Add CLOSED state to ExecutionState
687dd4e is described below

commit 687dd4eb55739f802692b3c5457618fd6558e538
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Fri Jul 12 10:31:28 2019 -0700

    [SPARK-28260][SQL] Add CLOSED state to ExecutionState
    
    ## What changes were proposed in this pull request?
    
    The `ThriftServerTab` displays a FINISHED state when the operation finishes 
execution, but quite often it still takes a lot of time to fetch the results. 
OperationState has state CLOSED for when after the iterator is closed. This PR 
add CLOSED state to ExecutionState, and override the `close()` in 
SparkExecuteStatementOperation, SparkGetColumnsOperation, 
SparkGetSchemasOperation and SparkGetTablesOperation.
    
    ## How was this patch tested?
    
    manual tests
    1. Add `Thread.sleep(10000)` before 
[SparkExecuteStatementOperation.scala#L112](https://github.com/apache/spark/blob/b2e7677f4d3d8f47f5f148680af39d38f2b558f0/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala#L112)
    2. Switch to `ThriftServerTab`:
    
![image](https://user-images.githubusercontent.com/5399861/60809590-9dcf2500-a1bd-11e9-826e-33729bb97daf.png)
    3. After a while:
    
![image](https://user-images.githubusercontent.com/5399861/60809719-e850a180-a1bd-11e9-9a6a-546146e626ab.png)
    
    Closes #25062 from wangyum/SPARK-28260.
    
    Authored-by: Yuming Wang <yumw...@ebay.com>
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
---
 .../spark/sql/hive/thriftserver/HiveThriftServer2.scala    | 14 ++++++++++----
 .../hive/thriftserver/SparkExecuteStatementOperation.scala |  3 ++-
 .../sql/hive/thriftserver/SparkGetColumnsOperation.scala   |  9 ++++++++-
 .../sql/hive/thriftserver/SparkGetSchemasOperation.scala   |  9 ++++++++-
 .../sql/hive/thriftserver/SparkGetTablesOperation.scala    |  9 ++++++++-
 .../spark/sql/hive/thriftserver/ui/ThriftServerPage.scala  |  8 +++++---
 .../sql/hive/thriftserver/ui/ThriftServerSessionPage.scala |  8 +++++---
 7 files changed, 46 insertions(+), 14 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index d1de9f0..b4d1d0d 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -137,7 +137,7 @@ object HiveThriftServer2 extends Logging {
   }
 
   private[thriftserver] object ExecutionState extends Enumeration {
-    val STARTED, COMPILED, FAILED, FINISHED = Value
+    val STARTED, COMPILED, FAILED, FINISHED, CLOSED = Value
     type ExecutionState = Value
   }
 
@@ -147,16 +147,17 @@ object HiveThriftServer2 extends Logging {
       val startTimestamp: Long,
       val userName: String) {
     var finishTimestamp: Long = 0L
+    var closeTimestamp: Long = 0L
     var executePlan: String = ""
     var detail: String = ""
     var state: ExecutionState.Value = ExecutionState.STARTED
     val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
     var groupId: String = ""
-    def totalTime: Long = {
-      if (finishTimestamp == 0L) {
+    def totalTime(endTime: Long): Long = {
+      if (endTime == 0L) {
         System.currentTimeMillis - startTimestamp
       } else {
-        finishTimestamp - startTimestamp
+        endTime - startTimestamp
       }
     }
   }
@@ -254,6 +255,11 @@ object HiveThriftServer2 extends Logging {
       trimExecutionIfNecessary()
     }
 
+    def onOperationClosed(id: String): Unit = synchronized {
+      executionList(id).closeTimestamp = System.currentTimeMillis
+      executionList(id).state = ExecutionState.CLOSED
+    }
+
     private def trimExecutionIfNecessary() = {
       if (executionList.size > retainedStatements) {
         val toRemove = math.max(retainedStatements / 10, 1)
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 820f76d..2f011c2 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -70,11 +70,12 @@ private[hive] class SparkExecuteStatementOperation(
     }
   }
 
-  def close(): Unit = {
+  override def close(): Unit = {
     // RDDs will be cleaned automatically upon garbage collection.
     logDebug(s"CLOSING $statementId")
     cleanup(OperationState.CLOSED)
     sqlContext.sparkContext.clearJobGroup()
+    HiveThriftServer2.listener.onOperationClosed(statementId)
   }
 
   def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: 
Int) {
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
index 99ba968..89faff2 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
@@ -58,8 +58,15 @@ private[hive] class SparkGetColumnsOperation(
 
   val catalog: SessionCatalog = sqlContext.sessionState.catalog
 
+  private var statementId: String = _
+
+  override def close(): Unit = {
+    super.close()
+    HiveThriftServer2.listener.onOperationClosed(statementId)
+  }
+
   override def runInternal(): Unit = {
-    val statementId = UUID.randomUUID().toString
+    statementId = UUID.randomUUID().toString
     // Do not change cmdStr. It's used for Hive auditing and authorization.
     val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, 
tablePattern : $tableName"
     val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'"
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
index 3ecbbd0..87ef154 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
@@ -45,8 +45,15 @@ private[hive] class SparkGetSchemasOperation(
     schemaName: String)
   extends GetSchemasOperation(parentSession, catalogName, schemaName) with 
Logging {
 
+  private var statementId: String = _
+
+  override def close(): Unit = {
+    super.close()
+    HiveThriftServer2.listener.onOperationClosed(statementId)
+  }
+
   override def runInternal(): Unit = {
-    val statementId = UUID.randomUUID().toString
+    statementId = UUID.randomUUID().toString
     // Do not change cmdStr. It's used for Hive auditing and authorization.
     val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
     val logMsg = s"Listing databases '$cmdStr'"
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
index 8786836..952de42 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
@@ -55,8 +55,15 @@ private[hive] class SparkGetTablesOperation(
   extends GetTablesOperation(parentSession, catalogName, schemaName, 
tableName, tableTypes)
     with Logging{
 
+  private var statementId: String = _
+
+  override def close(): Unit = {
+    super.close()
+    HiveThriftServer2.listener.onOperationClosed(statementId)
+  }
+
   override def runInternal(): Unit = {
-    val statementId = UUID.randomUUID().toString
+    statementId = UUID.randomUUID().toString
     // Do not change cmdStr. It's used for Hive auditing and authorization.
     val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
     val tableTypesStr = if (tableTypes == null) "null" else 
tableTypes.asScala.mkString(",")
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
index 27d2c99..1747b5b 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
@@ -70,8 +70,8 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) 
extends WebUIPage(""
   private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = {
     val numStatement = listener.getExecutionList.size
     val table = if (numStatement > 0) {
-      val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish 
Time", "Duration",
-        "Statement", "State", "Detail")
+      val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish 
Time", "Close Time",
+        "Execution Time", "Duration", "Statement", "State", "Detail")
       val dataRows = listener.getExecutionList.sortBy(_.startTimestamp).reverse
 
       def generateDataRow(info: ExecutionInfo): Seq[Node] = {
@@ -90,7 +90,9 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) 
extends WebUIPage(""
           <td>{info.groupId}</td>
           <td>{formatDate(info.startTimestamp)}</td>
           <td>{if (info.finishTimestamp > 0) 
formatDate(info.finishTimestamp)}</td>
-          <td>{formatDurationOption(Some(info.totalTime))}</td>
+          <td>{if (info.closeTimestamp > 0) 
formatDate(info.closeTimestamp)}</td>
+          
<td>{formatDurationOption(Some(info.totalTime(info.finishTimestamp)))}</td>
+          
<td>{formatDurationOption(Some(info.totalTime(info.closeTimestamp)))}</td>
           <td>{info.statement}</td>
           <td>{info.state}</td>
           {errorMessageCell(detail)}
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
index fdc9bee..a45c6e3 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
@@ -79,8 +79,8 @@ private[ui] class ThriftServerSessionPage(parent: 
ThriftServerTab)
       .filter(_.sessionId == sessionID)
     val numStatement = executionList.size
     val table = if (numStatement > 0) {
-      val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish 
Time", "Duration",
-        "Statement", "State", "Detail")
+      val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish 
Time", "Close Time",
+        "Execution Time", "Duration", "Statement", "State", "Detail")
       val dataRows = executionList.sortBy(_.startTimestamp).reverse
 
       def generateDataRow(info: ExecutionInfo): Seq[Node] = {
@@ -99,7 +99,9 @@ private[ui] class ThriftServerSessionPage(parent: 
ThriftServerTab)
           <td>{info.groupId}</td>
           <td>{formatDate(info.startTimestamp)}</td>
           <td>{formatDate(info.finishTimestamp)}</td>
-          <td>{formatDurationOption(Some(info.totalTime))}</td>
+          <td>{formatDate(info.closeTimestamp)}</td>
+          
<td>{formatDurationOption(Some(info.totalTime(info.finishTimestamp)))}</td>
+          
<td>{formatDurationOption(Some(info.totalTime(info.closeTimestamp)))}</td>
           <td>{info.statement}</td>
           <td>{info.state}</td>
           {errorMessageCell(detail)}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to