This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new aa10c87dd319 [SPARK-55046][PYTHON] PySpark add udf processing time 
metric
aa10c87dd319 is described below

commit aa10c87dd3194b0866c8e34884875bd63aff7de7
Author: Eddie Bkheet <[email protected]>
AuthorDate: Fri Jan 23 13:34:27 2026 -0800

    [SPARK-55046][PYTHON] PySpark add udf processing time metric
    
    ### What changes were proposed in this pull request?
    
    Adds a new **pythonProcessingTime** metric to capture the actual Python 
code execution time in Python UDFs, UDTFs, and Python-based operations, 
separate from worker boot and initialization overhead.
    
    ### Why are the changes needed?
    
    Motivation: the existing pythonTotalTime metric includes:
    
    Python worker boot time (pythonBootTime)
    Python worker initialization time (pythonInitTime)
    Actual Python code execution time
    This makes it difficult to identify whether performance issues are due to 
worker startup overhead or the UDF logic itself. The new processingTimeNs 
metric isolates just the Python code execution time for better observability.
    
    ### Does this PR introduce _any_ user-facing change?
    
    an additional metric will be displayed in the spark UI
    
    ### How was this patch tested?
    
    new UTs
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes
    
    Closes #53831 from eddiebkheet/user/eddiebkheet/metrics/udf-processing-time.
    
    Authored-by: Eddie Bkheet <[email protected]>
    Signed-off-by: Takuya Ueshin <[email protected]>
---
 .../org/apache/spark/api/python/PythonRunner.scala |   2 +
 python/pyspark/worker.py                           |   7 +-
 .../sql/execution/python/PythonSQLMetrics.scala    |   3 +-
 .../sql/execution/python/PythonUDFSuite.scala      | 109 +++++++++++++++++++++
 4 files changed, 118 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 5015dafbbb2d..f830dd2d8b6e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -625,6 +625,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       val bootTime = stream.readLong()
       val initTime = stream.readLong()
       val finishTime = stream.readLong()
+      val processingTimeMs = stream.readLong()
       val boot = bootTime - startTime
       val init = initTime - bootTime
       val finish = finishTime - initTime
@@ -647,6 +648,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       metrics.get("pythonBootTime").foreach(_.add(boot))
       metrics.get("pythonInitTime").foreach(_.add(init))
       metrics.get("pythonTotalTime").foreach(_.add(total))
+      metrics.get("pythonProcessingTime").foreach(_.add(processingTimeMs))
       val memoryBytesSpilled = stream.readLong()
       val diskBytesSpilled = stream.readLong()
       context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 0df7eaf709c8..2e79c981d818 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -201,11 +201,12 @@ class RunnerConf:
         return self.get("spark.sql.pyspark.udf.profiler", None)
 
 
-def report_times(outfile, boot, init, finish):
+def report_times(outfile, boot, init, finish, processing_time_ms):
     write_int(SpecialLengths.TIMING_DATA, outfile)
     write_long(int(1000 * boot), outfile)
     write_long(int(1000 * init), outfile)
     write_long(int(1000 * finish), outfile)
+    write_long(processing_time_ms, outfile)
 
 
 def chain(f, g):
@@ -3429,11 +3430,13 @@ def main(infile, outfile):
                 if hasattr(out_iter, "close"):
                     out_iter.close()
 
+        processing_start_time = time.time()
         with capture_outputs():
             if profiler:
                 profiler.profile(process)
             else:
                 process()
+        processing_time_ms = int(1000 * (time.time() - processing_start_time))
 
         # Reset task context to None. This is a guard code to avoid residual 
context when worker
         # reuse.
@@ -3443,7 +3446,7 @@ def main(infile, outfile):
         handle_worker_exception(e, outfile)
         sys.exit(-1)
     finish_time = time.time()
-    report_times(outfile, boot_time, init_time, finish_time)
+    report_times(outfile, boot_time, init_time, finish_time, 
processing_time_ms)
     write_long(shuffle.MemoryBytesSpilled, outfile)
     write_long(shuffle.DiskBytesSpilled, outfile)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
index 43a47d1f7d90..cbce07977f16 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
@@ -46,7 +46,8 @@ object PythonSQLMetrics {
     Map(
       "pythonBootTime" -> "time to start Python workers",
       "pythonInitTime" -> "time to initialize Python workers",
-      "pythonTotalTime" -> "time to run Python workers"
+      "pythonTotalTime" -> "time to run Python workers",
+      "pythonProcessingTime" -> "time to execute Python code"
     )
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
index 9b40226c2049..6d5c459514e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.python
 
 import org.apache.spark.sql.{AnalysisException, IntegratedUDFTestUtils, 
QueryTest, Row}
 import org.apache.spark.sql.functions.{array, avg, col, count, transform}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.LongType
 
@@ -156,4 +157,112 @@ class PythonUDFSuite extends QueryTest with 
SharedSparkSession {
 
     checkAnswer(df, Row(0, 0.0, 0))
   }
+
+  test("SPARK-55046: pythonProcessingTime metric is available for Python 
UDFs") {
+    assume(shouldTestPythonUDFs)
+    val pythonSQLMetrics = List(
+      "data sent to Python workers",
+      "data returned from Python workers",
+      "number of output rows",
+      "time to initialize Python workers",
+      "time to start Python workers",
+      "time to run Python workers",
+      "time to execute Python code")
+
+    val df = base.groupBy(pythonTestUDF(base("a") + 1))
+      .agg(pythonTestUDF(pythonTestUDF(base("a") + 1)))
+    df.count()
+
+    val statusStore = spark.sharedState.statusStore
+    val lastExecId = statusStore.executionsList().last.executionId
+    val executionMetrics = 
statusStore.execution(lastExecId).get.metrics.mkString
+    for (metric <- pythonSQLMetrics) {
+      assert(executionMetrics.contains(metric),
+        s"Expected metric '$metric' not found in execution metrics")
+    }
+  }
+
+  test(
+    "SPARK-55046: pythonProcessingTime reflects actual UDF computation time"
+  ) {
+    assume(shouldTestPythonUDFs)
+    val df = spark.range(10000)
+    val result = df.select(pythonTestUDF(col("id")))
+    result.collect()
+
+    val pythonExec = result.queryExecution.executedPlan.collectFirst {
+      case p: BatchEvalPythonExec => p
+    }.getOrElse {
+      fail("Expected BatchEvalPythonExec in executed plan")
+    }
+
+    val processingTime = 
pythonExec.metrics.get("pythonProcessingTime").map(_.value).getOrElse(0L)
+    val pythonTotalTime = 
pythonExec.metrics.get("pythonTotalTime").map(_.value).getOrElse(0L)
+
+    // Processing time should be non-zero
+    assert(processingTime > 0,
+      s"pythonProcessingTime should be > 0, but was $processingTime")
+
+    // Python total time should also be non-zero and >= processing time
+    assert(pythonTotalTime > 0 && pythonTotalTime >= processingTime,
+      s"pythonTotalTime should be > 0, but was $pythonTotalTime")
+  }
+
+  test("SPARK-55046:pythonProcessingTime metric for ArrowEvalPythonExec") {
+    assume(shouldTestPythonUDFs)
+    withSQLConf(SQLConf.ARROW_PYSPARK_EXECUTION_ENABLED.key -> "true") {
+      val df = spark.range(100)
+      val result = df.select(pythonTestUDF(col("id")))
+      result.collect()
+
+      val arrowExec = result.queryExecution.executedPlan.collectFirst {
+        case p: ArrowEvalPythonExec => p
+      }
+
+      // If Arrow execution is available, verify the metric, otherwise, skip 
the test
+      arrowExec.foreach { exec =>
+        val processingTime = 
exec.metrics.get("pythonProcessingTime").map(_.value).getOrElse(0L)
+        val pythonTotalTime = 
exec.metrics.get("pythonTotalTime").map(_.value).getOrElse(0L)
+
+        assert(processingTime > 0,
+          s"pythonProcessingTime should be > 0 for ArrowEvalPythonExec, but 
was $processingTime")
+        assert(pythonTotalTime > 0 && pythonTotalTime >= processingTime,
+          s"pythonTotalTime should be > 0 for ArrowEvalPythonExec, but was 
$pythonTotalTime")
+      }
+    }
+  }
+
+  test("SPARK-55046: pythonProcessingTime metric for BatchEvalPythonUDTFExec") 
{
+    assume(shouldTestPythonUDFs)
+    val udtf = TestPythonUDTF(name = "test_udtf")
+
+    spark.udtf.registerPython(udtf.name, udtf.udtf)
+    withTempView("t") {
+      try {
+        spark.range(1000).selectExpr("id % 100 as a", "id % 50 as b")
+          .createOrReplaceTempView("t")
+        val result = sql(s"SELECT f.* FROM t, LATERAL ${udtf.name}(a, b) f")
+        result.collect()
+
+        val udtfExec = result.queryExecution.executedPlan.collectFirst {
+          case p: BatchEvalPythonUDTFExec => p
+        }.getOrElse {
+          fail("Expected BatchEvalPythonUDTFExec in executed plan")
+        }
+
+        // Verify the metric exists and has a positive value
+        val processingTime = 
udtfExec.metrics.get("pythonProcessingTime").map(_.value).getOrElse(0L)
+        val pythonTotalTime = 
udtfExec.metrics.get("pythonTotalTime").map(_.value).getOrElse(0L)
+
+        assert(processingTime > 0,
+          s"pythonProcessingTime should be > 0 for BatchEvalPythonUDTFExec," +
+            s" but was $processingTime")
+        assert(pythonTotalTime > 0 && pythonTotalTime >= processingTime,
+          s"pythonTotalTime should be > 0 for BatchEvalPythonUDTFExec," +
+            s" but was $pythonTotalTime")
+      } finally {
+        spark.sessionState.catalog.dropTempFunction(udtf.name, 
ignoreIfNotExists = true)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to