This is an automated email from the ASF dual-hosted git repository.
csy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 3bc28fde6 [KYUUBI #6107] [Spark] Collect and summarize the
`executorRunTime` and `executorCpuTime` of the statement
3bc28fde6 is described below
commit 3bc28fde65ee41e6884b0bef0d963f60a016d8c1
Author: bkhan <[email protected]>
AuthorDate: Mon Mar 4 11:22:42 2024 +0800
[KYUUBI #6107] [Spark] Collect and summarize the `executorRunTime` and
`executorCpuTime` of the statement
# :mag: Description
## Issue References ๐
This pull request fixes #6107
## Describe Your Solution ๐ง
The total execution time of a statement (or a session) is the summary of
the execution time of the stages belonging to the statement (or session).
The total execution time of a stage is collected from
`SQLOperationListener#onStageCompleted`.
The total execution times of the statement or a session are stored in the
engine events or output to the log.
<img width="962" alt="ๆชๅฑ2024-02-29 14 47 50"
src="https://github.com/apache/kyuubi/assets/23011702/176df1db-bb20-428b-94b8-fa02c946fde2">
<img width="1143" alt="ๆชๅฑ2024-02-29 14 47 21"
src="https://github.com/apache/kyuubi/assets/23011702/8cfc6a72-f6e8-45b6-bdda-30296c94c893">
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6112 from XorSum/features/spark-engine-cpu-time-collect.
Closes #6107
802800566 [bkhan] check same group
d9efa2d22 [bkhan] formatDuration
a8841cd8f [bkhan] update
25071597e [bkhan] Apply suggestions from code review
cfed2b9df [bkhan] use formatDurationVerbose
444d4aab5 [bkhan] Collect and summarize the executorRunTime and
executorCpuTime of the statement
Authored-by: bkhan <[email protected]>
Signed-off-by: Shaoyun Chen <[email protected]>
---
.../kyuubi/engine/spark/events/SessionEvent.scala | 4 ++-
.../engine/spark/events/SparkOperationEvent.scala | 12 ++++++---
.../engine/spark/operation/SparkOperation.scala | 16 +++++++++++-
.../engine/spark/session/SparkSessionImpl.scala | 18 ++++++++++++++
.../apache/spark/kyuubi/SQLOperationListener.scala | 17 +++++++++++++
.../org/apache/spark/ui/SparkUIUtilsHelper.scala | 29 ++++++++++++++++++++++
6 files changed, 91 insertions(+), 5 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
index 2d6649d25..610b1645c 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
@@ -47,7 +47,9 @@ case class SessionEvent(
conf: Map[String, String],
startTime: Long,
var endTime: Long = -1L,
- var totalOperations: Int = 0) extends KyuubiEvent with SparkListenerEvent {
+ var totalOperations: Int = 0,
+ var sessionRunTime: Long = 0,
+ var sessionCpuTime: Long = 0) extends KyuubiEvent with SparkListenerEvent {
override lazy val partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(startTime)) :: Nil
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
index 319e1c09c..4b529cdaf 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
@@ -59,7 +59,9 @@ case class SparkOperationEvent(
exception: Option[Throwable],
sessionId: String,
sessionUser: String,
- executionId: Option[Long]) extends KyuubiEvent with SparkListenerEvent {
+ executionId: Option[Long],
+ operationRunTime: Option[Long],
+ operationCpuTime: Option[Long]) extends KyuubiEvent with
SparkListenerEvent {
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
@@ -79,7 +81,9 @@ case class SparkOperationEvent(
object SparkOperationEvent {
def apply(
operation: SparkOperation,
- executionId: Option[Long] = None): SparkOperationEvent = {
+ executionId: Option[Long] = None,
+ operationRunTime: Option[Long] = None,
+ operationCpuTime: Option[Long] = None): SparkOperationEvent = {
val session = operation.getSession
val status = operation.getStatus
new SparkOperationEvent(
@@ -94,6 +98,8 @@ object SparkOperationEvent {
status.exception,
session.handle.identifier.toString,
session.user,
- executionId)
+ executionId,
+ operationRunTime,
+ operationCpuTime)
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 88ebc306b..da65d9f64 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -25,6 +25,7 @@ import org.apache.spark.kyuubi.SparkUtilsHelper.redact
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
+import org.apache.spark.ui.SparkUIUtilsHelper.formatDuration
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
@@ -124,7 +125,20 @@ abstract class SparkOperation(session: Session)
override protected def setState(newState: OperationState): Unit = {
super.setState(newState)
if (eventEnabled) {
- EventBus.post(SparkOperationEvent(this,
operationListener.flatMap(_.getExecutionId)))
+ EventBus.post(SparkOperationEvent(
+ this,
+ operationListener.flatMap(_.getExecutionId),
+ operationListener.map(_.getOperationRunTime),
+ operationListener.map(_.getOperationCpuTime)))
+ if (OperationState.isTerminal(newState)) {
+ operationListener.foreach(l => {
+ info(s"statementId=${statementId}, " +
+ s"operationRunTime=${formatDuration(l.getOperationRunTime)}, " +
+ s"operationCpuTime=${formatDuration(l.getOperationCpuTime /
1000000)}")
+
session.asInstanceOf[SparkSessionImpl].increaseRunTime(l.getOperationRunTime)
+
session.asInstanceOf[SparkSessionImpl].increaseCpuTime(l.getOperationCpuTime)
+ })
+ }
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 08bd09b44..e899c28c3 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -17,8 +17,11 @@
package org.apache.kyuubi.engine.spark.session
+import java.util.concurrent.atomic.AtomicLong
+
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.ui.SparkUIUtilsHelper.formatDuration
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
@@ -43,6 +46,8 @@ class SparkSessionImpl(
override val handle: SessionHandle =
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+ private val sessionRunTime = new AtomicLong(0)
+ private val sessionCpuTime = new AtomicLong(0)
private def setModifiableConfig(key: String, value: String): Unit = {
try {
@@ -110,7 +115,12 @@ class SparkSessionImpl(
}
override def close(): Unit = {
+ info(s"sessionId=${sessionEvent.sessionId}, " +
+ s"sessionRunTime=${formatDuration(sessionRunTime.get())}, " +
+ s"sessionCpuTime=${formatDuration(sessionCpuTime.get() / 1000000)}")
sessionEvent.endTime = System.currentTimeMillis()
+ sessionEvent.sessionRunTime = sessionRunTime.get()
+ sessionEvent.sessionCpuTime = sessionCpuTime.get()
EventBus.post(sessionEvent)
super.close()
spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable)
@@ -118,4 +128,12 @@ class SparkSessionImpl(
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closePythonProcess(
handle)
}
+
+ def increaseRunTime(time: Long): Unit = {
+ sessionRunTime.getAndAdd(time)
+ }
+
+ def increaseCpuTime(time: Long): Unit = {
+ sessionCpuTime.getAndAdd(time)
+ }
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
index a7d409c7c..4342c0c7f 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
@@ -19,12 +19,14 @@ package org.apache.spark.kyuubi
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import org.apache.spark.scheduler._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
+import org.apache.spark.ui.UIUtils.formatDuration
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS,
ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT,
ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL}
@@ -61,6 +63,13 @@ class SQLOperationListener(
None
}
+ private val operationRunTime = new AtomicLong(0)
+ private val operationCpuTime = new AtomicLong(0)
+
+ def getOperationRunTime: Long = operationRunTime.get()
+
+ def getOperationCpuTime: Long = operationCpuTime.get()
+
def getExecutionId: Option[Long] = executionId
// For broadcast, Spark will introduce a new runId as SPARK_JOB_GROUP_ID,
see:
@@ -150,6 +159,14 @@ class SQLOperationListener(
}
}
}
+ val taskMetrics = stageInfo.taskMetrics
+ if (taskMetrics != null) {
+ info(s"stageId=${stageCompleted.stageInfo.stageId}, " +
+ s"stageRunTime=${formatDuration(taskMetrics.executorRunTime)}, " +
+ s"stageCpuTime=${formatDuration(taskMetrics.executorCpuTime /
1000000)}")
+ operationRunTime.getAndAdd(taskMetrics.executorRunTime)
+ operationCpuTime.getAndAdd(taskMetrics.executorCpuTime)
+ }
withOperationLog(super.onStageCompleted(stageCompleted))
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/SparkUIUtilsHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/SparkUIUtilsHelper.scala
new file mode 100644
index 000000000..c60fe4466
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/SparkUIUtilsHelper.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+/**
+ * A place to invoke non-public APIs of [[UIUtils]], anything to be added here
need to
+ * think twice
+ */
+object SparkUIUtilsHelper {
+
+ def formatDuration(ms: Long): String = {
+ UIUtils.formatDuration(ms)
+ }
+}