This is an automated email from the ASF dual-hosted git repository.

csy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bc28fde6 [KYUUBI #6107] [Spark] Collect and summarize the 
`executorRunTime` and `executorCpuTime` of the statement
3bc28fde6 is described below

commit 3bc28fde65ee41e6884b0bef0d963f60a016d8c1
Author: bkhan <[email protected]>
AuthorDate: Mon Mar 4 11:22:42 2024 +0800

    [KYUUBI #6107] [Spark] Collect and summarize the `executorRunTime` and 
`executorCpuTime` of the statement
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6107
    
    ## Describe Your Solution ๐Ÿ”ง
    
    The total execution time of a statement (or a session) is the summary of 
the execution time of the stages belonging to the statement (or session).
    The total execution time of a stage is collected from 
`SQLOperationListener#onStageCompleted`.
    The total execution times of the statement or a session are stored in the 
engine events or output to the log.
    
    <img width="962" alt="ๆˆชๅฑ2024-02-29 14 47 50" 
src="https://github.com/apache/kyuubi/assets/23011702/176df1db-bb20-428b-94b8-fa02c946fde2";>
    <img width="1143" alt="ๆˆชๅฑ2024-02-29 14 47 21" 
src="https://github.com/apache/kyuubi/assets/23011702/8cfc6a72-f6e8-45b6-bdda-30296c94c893";>
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6112 from XorSum/features/spark-engine-cpu-time-collect.
    
    Closes #6107
    
    802800566 [bkhan] check same group
    d9efa2d22 [bkhan] formatDuration
    a8841cd8f [bkhan] update
    25071597e [bkhan] Apply suggestions from code review
    cfed2b9df [bkhan] use formatDurationVerbose
    444d4aab5 [bkhan] Collect and summarize the executorRunTime and 
executorCpuTime of the statement
    
    Authored-by: bkhan <[email protected]>
    Signed-off-by: Shaoyun Chen <[email protected]>
---
 .../kyuubi/engine/spark/events/SessionEvent.scala  |  4 ++-
 .../engine/spark/events/SparkOperationEvent.scala  | 12 ++++++---
 .../engine/spark/operation/SparkOperation.scala    | 16 +++++++++++-
 .../engine/spark/session/SparkSessionImpl.scala    | 18 ++++++++++++++
 .../apache/spark/kyuubi/SQLOperationListener.scala | 17 +++++++++++++
 .../org/apache/spark/ui/SparkUIUtilsHelper.scala   | 29 ++++++++++++++++++++++
 6 files changed, 91 insertions(+), 5 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
index 2d6649d25..610b1645c 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
@@ -47,7 +47,9 @@ case class SessionEvent(
     conf: Map[String, String],
     startTime: Long,
     var endTime: Long = -1L,
-    var totalOperations: Int = 0) extends KyuubiEvent with SparkListenerEvent {
+    var totalOperations: Int = 0,
+    var sessionRunTime: Long = 0,
+    var sessionCpuTime: Long = 0) extends KyuubiEvent with SparkListenerEvent {
 
   override lazy val partitions: Seq[(String, String)] =
     ("day", Utils.getDateFromTimestamp(startTime)) :: Nil
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
index 319e1c09c..4b529cdaf 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
@@ -59,7 +59,9 @@ case class SparkOperationEvent(
     exception: Option[Throwable],
     sessionId: String,
     sessionUser: String,
-    executionId: Option[Long]) extends KyuubiEvent with SparkListenerEvent {
+    executionId: Option[Long],
+    operationRunTime: Option[Long],
+    operationCpuTime: Option[Long]) extends KyuubiEvent with 
SparkListenerEvent {
 
   override def partitions: Seq[(String, String)] =
     ("day", Utils.getDateFromTimestamp(createTime)) :: Nil
@@ -79,7 +81,9 @@ case class SparkOperationEvent(
 object SparkOperationEvent {
   def apply(
       operation: SparkOperation,
-      executionId: Option[Long] = None): SparkOperationEvent = {
+      executionId: Option[Long] = None,
+      operationRunTime: Option[Long] = None,
+      operationCpuTime: Option[Long] = None): SparkOperationEvent = {
     val session = operation.getSession
     val status = operation.getStatus
     new SparkOperationEvent(
@@ -94,6 +98,8 @@ object SparkOperationEvent {
       status.exception,
       session.handle.identifier.toString,
       session.user,
-      executionId)
+      executionId,
+      operationRunTime,
+      operationCpuTime)
   }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 88ebc306b..da65d9f64 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -25,6 +25,7 @@ import org.apache.spark.kyuubi.SparkUtilsHelper.redact
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
+import org.apache.spark.ui.SparkUIUtilsHelper.formatDuration
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
@@ -124,7 +125,20 @@ abstract class SparkOperation(session: Session)
   override protected def setState(newState: OperationState): Unit = {
     super.setState(newState)
     if (eventEnabled) {
-      EventBus.post(SparkOperationEvent(this, 
operationListener.flatMap(_.getExecutionId)))
+      EventBus.post(SparkOperationEvent(
+        this,
+        operationListener.flatMap(_.getExecutionId),
+        operationListener.map(_.getOperationRunTime),
+        operationListener.map(_.getOperationCpuTime)))
+      if (OperationState.isTerminal(newState)) {
+        operationListener.foreach(l => {
+          info(s"statementId=${statementId}, " +
+            s"operationRunTime=${formatDuration(l.getOperationRunTime)}, " +
+            s"operationCpuTime=${formatDuration(l.getOperationCpuTime / 
1000000)}")
+          
session.asInstanceOf[SparkSessionImpl].increaseRunTime(l.getOperationRunTime)
+          
session.asInstanceOf[SparkSessionImpl].increaseCpuTime(l.getOperationCpuTime)
+        })
+      }
     }
   }
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 08bd09b44..e899c28c3 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -17,8 +17,11 @@
 
 package org.apache.kyuubi.engine.spark.session
 
+import java.util.concurrent.atomic.AtomicLong
+
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.ui.SparkUIUtilsHelper.formatDuration
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
@@ -43,6 +46,8 @@ class SparkSessionImpl(
 
   override val handle: SessionHandle =
     
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+  private val sessionRunTime = new AtomicLong(0)
+  private val sessionCpuTime = new AtomicLong(0)
 
   private def setModifiableConfig(key: String, value: String): Unit = {
     try {
@@ -110,7 +115,12 @@ class SparkSessionImpl(
   }
 
   override def close(): Unit = {
+    info(s"sessionId=${sessionEvent.sessionId}, " +
+      s"sessionRunTime=${formatDuration(sessionRunTime.get())}, " +
+      s"sessionCpuTime=${formatDuration(sessionCpuTime.get() / 1000000)}")
     sessionEvent.endTime = System.currentTimeMillis()
+    sessionEvent.sessionRunTime = sessionRunTime.get()
+    sessionEvent.sessionCpuTime = sessionCpuTime.get()
     EventBus.post(sessionEvent)
     super.close()
     
spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable)
@@ -118,4 +128,12 @@ class SparkSessionImpl(
     
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closePythonProcess(
       handle)
   }
+
+  def increaseRunTime(time: Long): Unit = {
+    sessionRunTime.getAndAdd(time)
+  }
+
+  def increaseCpuTime(time: Long): Unit = {
+    sessionCpuTime.getAndAdd(time)
+  }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
index a7d409c7c..4342c0c7f 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
@@ -19,12 +19,14 @@ package org.apache.spark.kyuubi
 
 import java.util.Properties
 import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
 
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
+import org.apache.spark.ui.UIUtils.formatDuration
 
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_SHOW_PROGRESS, 
ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT, 
ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL}
@@ -61,6 +63,13 @@ class SQLOperationListener(
       None
     }
 
+  private val operationRunTime = new AtomicLong(0)
+  private val operationCpuTime = new AtomicLong(0)
+
+  def getOperationRunTime: Long = operationRunTime.get()
+
+  def getOperationCpuTime: Long = operationCpuTime.get()
+
   def getExecutionId: Option[Long] = executionId
 
   // For broadcast, Spark will introduce a new runId as SPARK_JOB_GROUP_ID, 
see:
@@ -150,6 +159,14 @@ class SQLOperationListener(
               }
             }
         }
+        val taskMetrics = stageInfo.taskMetrics
+        if (taskMetrics != null) {
+          info(s"stageId=${stageCompleted.stageInfo.stageId}, " +
+            s"stageRunTime=${formatDuration(taskMetrics.executorRunTime)}, " +
+            s"stageCpuTime=${formatDuration(taskMetrics.executorCpuTime / 
1000000)}")
+          operationRunTime.getAndAdd(taskMetrics.executorRunTime)
+          operationCpuTime.getAndAdd(taskMetrics.executorCpuTime)
+        }
         withOperationLog(super.onStageCompleted(stageCompleted))
       }
     }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/SparkUIUtilsHelper.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/SparkUIUtilsHelper.scala
new file mode 100644
index 000000000..c60fe4466
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/SparkUIUtilsHelper.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+/**
+ * A place to invoke non-public APIs of [[UIUtils]], anything to be added here 
need to
+ * think twice
+ */
+object SparkUIUtilsHelper {
+
+  def formatDuration(ms: Long): String = {
+    UIUtils.formatDuration(ms)
+  }
+}

Reply via email to