This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b1369b4abed [SPARK-50499][PYTHON] Expose metrics from BasePythonRunner
2b1369b4abed is described below

commit 2b1369b4abed643d986ee6a22ec86371eadb831e
Author: Sebastian Hillig <[email protected]>
AuthorDate: Thu Dec 19 08:47:21 2024 +0900

    [SPARK-50499][PYTHON] Expose metrics from BasePythonRunner
    
    ### What changes were proposed in this pull request?
    
    Expose additional metrics from BasePythonRunner for all Python UDF 
executions: bootTime, initTime, and totalTime.
    
    ### Why are the changes needed?
    
    Metrics are currently only exposed as logs, but can be helpful to debug 
slow queries.
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Extended existing testcases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49076 from sebastianhillig-db/add-basepythonrunner-metrics.
    
    Lead-authored-by: Sebastian Hillig 
<[email protected]>
    Co-authored-by: Sebastian Hillig <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../main/scala/org/apache/spark/api/python/PythonRunner.scala  |  8 ++++++--
 .../execution/datasources/v2/python/PythonCustomMetric.scala   |  2 ++
 .../execution/python/ApplyInPandasWithStatePythonRunner.scala  |  3 ++-
 .../apache/spark/sql/execution/python/ArrowPythonRunner.scala  |  2 +-
 .../spark/sql/execution/python/ArrowPythonUDTFRunner.scala     |  2 +-
 .../sql/execution/python/CoGroupedArrowPythonRunner.scala      |  2 +-
 .../apache/spark/sql/execution/python/PythonSQLMetrics.scala   | 10 ++++++++++
 .../apache/spark/sql/execution/python/PythonUDFRunner.scala    |  2 +-
 .../python/TransformWithStateInPandasPythonRunner.scala        |  3 ++-
 .../org/apache/spark/sql/execution/python/PythonUDFSuite.scala |  5 ++++-
 10 files changed, 30 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index b7fb22bab844..a34cce980ae4 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -109,7 +109,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     protected val funcs: Seq[ChainedPythonFunctions],
     protected val evalType: Int,
     protected val argOffsets: Array[Array[Int]],
-    protected val jobArtifactUUID: Option[String])
+    protected val jobArtifactUUID: Option[String],
+    protected val metrics: Map[String, AccumulatorV2[Long, Long]])
   extends Logging {
 
   require(funcs.length == argOffsets.length, "argOffsets should have the same 
length as funcs")
@@ -522,6 +523,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
         log"boot = ${MDC(LogKeys.BOOT_TIME, boot)}, " +
         log"init = ${MDC(LogKeys.INIT_TIME, init)}, " +
         log"finish = ${MDC(LogKeys.FINISH_TIME, finish)}")
+      metrics.get("pythonBootTime").foreach(_.add(boot))
+      metrics.get("pythonInitTime").foreach(_.add(init))
+      metrics.get("pythonTotalTime").foreach(_.add(total))
       val memoryBytesSpilled = stream.readLong()
       val diskBytesSpilled = stream.readLong()
       context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
@@ -824,7 +828,7 @@ private[spark] object PythonRunner {
 private[spark] class PythonRunner(
     funcs: Seq[ChainedPythonFunctions], jobArtifactUUID: Option[String])
   extends BasePythonRunner[Array[Byte], Array[Byte]](
-    funcs, PythonEvalType.NON_UDF, Array(Array(0)), jobArtifactUUID) {
+    funcs, PythonEvalType.NON_UDF, Array(Array(0)), jobArtifactUUID, 
Map.empty) {
 
   protected override def newWriter(
       env: SparkEnv,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonCustomMetric.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonCustomMetric.scala
index bca1cbed7e70..7551cd04f20f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonCustomMetric.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonCustomMetric.scala
@@ -45,6 +45,8 @@ object PythonCustomMetric {
     // See also `UserDefinedPythonDataSource.createPythonMetrics`.
     PythonSQLMetrics.pythonSizeMetricsDesc.keys
       .map(_ -> new SQLMetric("size", -1)).toMap ++
+      PythonSQLMetrics.pythonTimingMetricsDesc.keys
+        .map(_ -> new SQLMetric("timing", -1)).toMap ++
       PythonSQLMetrics.pythonOtherMetricsDesc.keys
         .map(_ -> new SQLMetric("sum", -1)).toMap
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
index ae982f2f87f2..d704638b85e8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
@@ -65,7 +65,8 @@ class ApplyInPandasWithStatePythonRunner(
     stateValueSchema: StructType,
     override val pythonMetrics: Map[String, SQLMetric],
     jobArtifactUUID: Option[String])
-  extends BasePythonRunner[InType, OutType](funcs.map(_._1), evalType, 
argOffsets, jobArtifactUUID)
+  extends BasePythonRunner[InType, OutType](
+    funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
   with PythonArrowInput[InType]
   with PythonArrowOutput[OutType] {
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index 72e9c5210194..579b49604685 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -38,7 +38,7 @@ abstract class BaseArrowPythonRunner(
     override val pythonMetrics: Map[String, SQLMetric],
     jobArtifactUUID: Option[String])
   extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch](
-    funcs.map(_._1), evalType, argOffsets, jobArtifactUUID)
+    funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
   with BasicPythonArrowInput
   with BasicPythonArrowOutput {
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
index f52b01b6646a..99a9e706c662 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
@@ -43,7 +43,7 @@ class ArrowPythonUDTFRunner(
     jobArtifactUUID: Option[String])
   extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch](
       Seq(ChainedPythonFunctions(Seq(udtf.func))), evalType, 
Array(argMetas.map(_.offset)),
-      jobArtifactUUID)
+      jobArtifactUUID, pythonMetrics)
   with BasicPythonArrowInput
   with BasicPythonArrowOutput {
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
index 5670cad67e7b..c5e86d010938 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
@@ -51,7 +51,7 @@ class CoGroupedArrowPythonRunner(
     profiler: Option[String])
   extends BasePythonRunner[
     (Iterator[InternalRow], Iterator[InternalRow]), ColumnarBatch](
-    funcs.map(_._1), evalType, argOffsets, jobArtifactUUID)
+    funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
   with BasicPythonArrowOutput {
 
   override val pythonExec: String =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
index 4df6d821c014..bd22739613ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
@@ -24,6 +24,8 @@ trait PythonSQLMetrics { self: SparkPlan =>
   protected val pythonMetrics: Map[String, SQLMetric] = {
     PythonSQLMetrics.pythonSizeMetricsDesc.map { case (k, v) =>
       k -> SQLMetrics.createSizeMetric(sparkContext, v)
+    } ++ PythonSQLMetrics.pythonTimingMetricsDesc.map { case (k, v) =>
+      k -> SQLMetrics.createTimingMetric(sparkContext, v)
     } ++ PythonSQLMetrics.pythonOtherMetricsDesc.map { case (k, v) =>
       k -> SQLMetrics.createMetric(sparkContext, v)
     }
@@ -40,6 +42,14 @@ object PythonSQLMetrics {
     )
   }
 
+  val pythonTimingMetricsDesc: Map[String, String] = {
+    Map(
+      "pythonBootTime" -> "total time to start Python workers",
+      "pythonInitTime" -> "total time to initialize Python workers",
+      "pythonTotalTime" -> "total time to run Python workers"
+    )
+  }
+
   val pythonOtherMetricsDesc: Map[String, String] = {
     Map("pythonNumRowsReceived" -> "number of output rows")
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index 87ff5a0ec433..92310aa755db 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -36,7 +36,7 @@ abstract class BasePythonUDFRunner(
     pythonMetrics: Map[String, SQLMetric],
     jobArtifactUUID: Option[String])
   extends BasePythonRunner[Array[Byte], Array[Byte]](
-    funcs.map(_._1), evalType, argOffsets, jobArtifactUUID) {
+    funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics) {
 
   override val pythonExec: String =
     SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
index c5980012124f..641e49657ca9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
@@ -173,7 +173,8 @@ abstract class 
TransformWithStateInPandasPythonBaseRunner[I](
     groupingKeySchema: StructType,
     batchTimestampMs: Option[Long],
     eventTimeWatermarkForEviction: Option[Long])
-  extends BasePythonRunner[I, ColumnarBatch](funcs.map(_._1), evalType, 
argOffsets, jobArtifactUUID)
+  extends BasePythonRunner[I, ColumnarBatch](
+    funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
   with PythonArrowInput[I]
   with BasicPythonArrowOutput
   with Logging {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
index 4b46331be107..2f44994c301b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
@@ -91,7 +91,10 @@ class PythonUDFSuite extends QueryTest with 
SharedSparkSession {
     val pythonSQLMetrics = List(
       "data sent to Python workers",
       "data returned from Python workers",
-      "number of output rows")
+      "number of output rows",
+      "total time to initialize Python workers",
+      "total time to start Python workers",
+      "total time to run Python workers")
 
     val df = base.groupBy(pythonTestUDF(base("a") + 1))
       .agg(pythonTestUDF(pythonTestUDF(base("a") + 1)))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to