This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 687dd4e [SPARK-28260][SQL] Add CLOSED state to ExecutionState
687dd4e is described below
commit 687dd4eb55739f802692b3c5457618fd6558e538
Author: Yuming Wang <[email protected]>
AuthorDate: Fri Jul 12 10:31:28 2019 -0700
[SPARK-28260][SQL] Add CLOSED state to ExecutionState
## What changes were proposed in this pull request?
The `ThriftServerTab` displays a FINISHED state when the operation finishes
execution, but quite often it still takes a lot of time to fetch the results.
OperationState has state CLOSED for when after the iterator is closed. This PR
add CLOSED state to ExecutionState, and override the `close()` in
SparkExecuteStatementOperation, SparkGetColumnsOperation,
SparkGetSchemasOperation and SparkGetTablesOperation.
## How was this patch tested?
manual tests
1. Add `Thread.sleep(10000)` before
[SparkExecuteStatementOperation.scala#L112](https://github.com/apache/spark/blob/b2e7677f4d3d8f47f5f148680af39d38f2b558f0/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala#L112)
2. Switch to `ThriftServerTab`:

3. After a while:

Closes #25062 from wangyum/SPARK-28260.
Authored-by: Yuming Wang <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
---
.../spark/sql/hive/thriftserver/HiveThriftServer2.scala | 14 ++++++++++----
.../hive/thriftserver/SparkExecuteStatementOperation.scala | 3 ++-
.../sql/hive/thriftserver/SparkGetColumnsOperation.scala | 9 ++++++++-
.../sql/hive/thriftserver/SparkGetSchemasOperation.scala | 9 ++++++++-
.../sql/hive/thriftserver/SparkGetTablesOperation.scala | 9 ++++++++-
.../spark/sql/hive/thriftserver/ui/ThriftServerPage.scala | 8 +++++---
.../sql/hive/thriftserver/ui/ThriftServerSessionPage.scala | 8 +++++---
7 files changed, 46 insertions(+), 14 deletions(-)
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index d1de9f0..b4d1d0d 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -137,7 +137,7 @@ object HiveThriftServer2 extends Logging {
}
private[thriftserver] object ExecutionState extends Enumeration {
- val STARTED, COMPILED, FAILED, FINISHED = Value
+ val STARTED, COMPILED, FAILED, FINISHED, CLOSED = Value
type ExecutionState = Value
}
@@ -147,16 +147,17 @@ object HiveThriftServer2 extends Logging {
val startTimestamp: Long,
val userName: String) {
var finishTimestamp: Long = 0L
+ var closeTimestamp: Long = 0L
var executePlan: String = ""
var detail: String = ""
var state: ExecutionState.Value = ExecutionState.STARTED
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
var groupId: String = ""
- def totalTime: Long = {
- if (finishTimestamp == 0L) {
+ def totalTime(endTime: Long): Long = {
+ if (endTime == 0L) {
System.currentTimeMillis - startTimestamp
} else {
- finishTimestamp - startTimestamp
+ endTime - startTimestamp
}
}
}
@@ -254,6 +255,11 @@ object HiveThriftServer2 extends Logging {
trimExecutionIfNecessary()
}
+ def onOperationClosed(id: String): Unit = synchronized {
+ executionList(id).closeTimestamp = System.currentTimeMillis
+ executionList(id).state = ExecutionState.CLOSED
+ }
+
private def trimExecutionIfNecessary() = {
if (executionList.size > retainedStatements) {
val toRemove = math.max(retainedStatements / 10, 1)
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 820f76d..2f011c2 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -70,11 +70,12 @@ private[hive] class SparkExecuteStatementOperation(
}
}
- def close(): Unit = {
+ override def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug(s"CLOSING $statementId")
cleanup(OperationState.CLOSED)
sqlContext.sparkContext.clearJobGroup()
+ HiveThriftServer2.listener.onOperationClosed(statementId)
}
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal:
Int) {
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
index 99ba968..89faff2 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
@@ -58,8 +58,15 @@ private[hive] class SparkGetColumnsOperation(
val catalog: SessionCatalog = sqlContext.sessionState.catalog
+ private var statementId: String = _
+
+ override def close(): Unit = {
+ super.close()
+ HiveThriftServer2.listener.onOperationClosed(statementId)
+ }
+
override def runInternal(): Unit = {
- val statementId = UUID.randomUUID().toString
+ statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName,
tablePattern : $tableName"
val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'"
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
index 3ecbbd0..87ef154 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
@@ -45,8 +45,15 @@ private[hive] class SparkGetSchemasOperation(
schemaName: String)
extends GetSchemasOperation(parentSession, catalogName, schemaName) with
Logging {
+ private var statementId: String = _
+
+ override def close(): Unit = {
+ super.close()
+ HiveThriftServer2.listener.onOperationClosed(statementId)
+ }
+
override def runInternal(): Unit = {
- val statementId = UUID.randomUUID().toString
+ statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
val logMsg = s"Listing databases '$cmdStr'"
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
index 8786836..952de42 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
@@ -55,8 +55,15 @@ private[hive] class SparkGetTablesOperation(
extends GetTablesOperation(parentSession, catalogName, schemaName,
tableName, tableTypes)
with Logging{
+ private var statementId: String = _
+
+ override def close(): Unit = {
+ super.close()
+ HiveThriftServer2.listener.onOperationClosed(statementId)
+ }
+
override def runInternal(): Unit = {
- val statementId = UUID.randomUUID().toString
+ statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
val tableTypesStr = if (tableTypes == null) "null" else
tableTypes.asScala.mkString(",")
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
index 27d2c99..1747b5b 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
@@ -70,8 +70,8 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab)
extends WebUIPage(""
private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = {
val numStatement = listener.getExecutionList.size
val table = if (numStatement > 0) {
- val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish
Time", "Duration",
- "Statement", "State", "Detail")
+ val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish
Time", "Close Time",
+ "Execution Time", "Duration", "Statement", "State", "Detail")
val dataRows = listener.getExecutionList.sortBy(_.startTimestamp).reverse
def generateDataRow(info: ExecutionInfo): Seq[Node] = {
@@ -90,7 +90,9 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab)
extends WebUIPage(""
<td>{info.groupId}</td>
<td>{formatDate(info.startTimestamp)}</td>
<td>{if (info.finishTimestamp > 0)
formatDate(info.finishTimestamp)}</td>
- <td>{formatDurationOption(Some(info.totalTime))}</td>
+ <td>{if (info.closeTimestamp > 0)
formatDate(info.closeTimestamp)}</td>
+
<td>{formatDurationOption(Some(info.totalTime(info.finishTimestamp)))}</td>
+
<td>{formatDurationOption(Some(info.totalTime(info.closeTimestamp)))}</td>
<td>{info.statement}</td>
<td>{info.state}</td>
{errorMessageCell(detail)}
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
index fdc9bee..a45c6e3 100644
---
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
+++
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala
@@ -79,8 +79,8 @@ private[ui] class ThriftServerSessionPage(parent:
ThriftServerTab)
.filter(_.sessionId == sessionID)
val numStatement = executionList.size
val table = if (numStatement > 0) {
- val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish
Time", "Duration",
- "Statement", "State", "Detail")
+ val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish
Time", "Close Time",
+ "Execution Time", "Duration", "Statement", "State", "Detail")
val dataRows = executionList.sortBy(_.startTimestamp).reverse
def generateDataRow(info: ExecutionInfo): Seq[Node] = {
@@ -99,7 +99,9 @@ private[ui] class ThriftServerSessionPage(parent:
ThriftServerTab)
<td>{info.groupId}</td>
<td>{formatDate(info.startTimestamp)}</td>
<td>{formatDate(info.finishTimestamp)}</td>
- <td>{formatDurationOption(Some(info.totalTime))}</td>
+ <td>{formatDate(info.closeTimestamp)}</td>
+
<td>{formatDurationOption(Some(info.totalTime(info.finishTimestamp)))}</td>
+
<td>{formatDurationOption(Some(info.totalTime(info.closeTimestamp)))}</td>
<td>{info.statement}</td>
<td>{info.state}</td>
{errorMessageCell(detail)}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]