This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2b1369b4abed [SPARK-50499][PYTHON] Expose metrics from BasePythonRunner
2b1369b4abed is described below
commit 2b1369b4abed643d986ee6a22ec86371eadb831e
Author: Sebastian Hillig <[email protected]>
AuthorDate: Thu Dec 19 08:47:21 2024 +0900
[SPARK-50499][PYTHON] Expose metrics from BasePythonRunner
### What changes were proposed in this pull request?
Expose additional metrics from BasePythonRunner for all Python UDF
executions: bootTime, initTime, and totalTime.
### Why are the changes needed?
Metrics are currently only exposed as logs, but can be helpful to debug
slow queries.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Extended existing testcases.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49076 from sebastianhillig-db/add-basepythonrunner-metrics.
Lead-authored-by: Sebastian Hillig
<[email protected]>
Co-authored-by: Sebastian Hillig <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../main/scala/org/apache/spark/api/python/PythonRunner.scala | 8 ++++++--
.../execution/datasources/v2/python/PythonCustomMetric.scala | 2 ++
.../execution/python/ApplyInPandasWithStatePythonRunner.scala | 3 ++-
.../apache/spark/sql/execution/python/ArrowPythonRunner.scala | 2 +-
.../spark/sql/execution/python/ArrowPythonUDTFRunner.scala | 2 +-
.../sql/execution/python/CoGroupedArrowPythonRunner.scala | 2 +-
.../apache/spark/sql/execution/python/PythonSQLMetrics.scala | 10 ++++++++++
.../apache/spark/sql/execution/python/PythonUDFRunner.scala | 2 +-
.../python/TransformWithStateInPandasPythonRunner.scala | 3 ++-
.../org/apache/spark/sql/execution/python/PythonUDFSuite.scala | 5 ++++-
10 files changed, 30 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index b7fb22bab844..a34cce980ae4 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -109,7 +109,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
protected val funcs: Seq[ChainedPythonFunctions],
protected val evalType: Int,
protected val argOffsets: Array[Array[Int]],
- protected val jobArtifactUUID: Option[String])
+ protected val jobArtifactUUID: Option[String],
+ protected val metrics: Map[String, AccumulatorV2[Long, Long]])
extends Logging {
require(funcs.length == argOffsets.length, "argOffsets should have the same
length as funcs")
@@ -522,6 +523,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
log"boot = ${MDC(LogKeys.BOOT_TIME, boot)}, " +
log"init = ${MDC(LogKeys.INIT_TIME, init)}, " +
log"finish = ${MDC(LogKeys.FINISH_TIME, finish)}")
+ metrics.get("pythonBootTime").foreach(_.add(boot))
+ metrics.get("pythonInitTime").foreach(_.add(init))
+ metrics.get("pythonTotalTime").foreach(_.add(total))
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
@@ -824,7 +828,7 @@ private[spark] object PythonRunner {
private[spark] class PythonRunner(
funcs: Seq[ChainedPythonFunctions], jobArtifactUUID: Option[String])
extends BasePythonRunner[Array[Byte], Array[Byte]](
- funcs, PythonEvalType.NON_UDF, Array(Array(0)), jobArtifactUUID) {
+ funcs, PythonEvalType.NON_UDF, Array(Array(0)), jobArtifactUUID,
Map.empty) {
protected override def newWriter(
env: SparkEnv,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonCustomMetric.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonCustomMetric.scala
index bca1cbed7e70..7551cd04f20f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonCustomMetric.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonCustomMetric.scala
@@ -45,6 +45,8 @@ object PythonCustomMetric {
// See also `UserDefinedPythonDataSource.createPythonMetrics`.
PythonSQLMetrics.pythonSizeMetricsDesc.keys
.map(_ -> new SQLMetric("size", -1)).toMap ++
+ PythonSQLMetrics.pythonTimingMetricsDesc.keys
+ .map(_ -> new SQLMetric("timing", -1)).toMap ++
PythonSQLMetrics.pythonOtherMetricsDesc.keys
.map(_ -> new SQLMetric("sum", -1)).toMap
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
index ae982f2f87f2..d704638b85e8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
@@ -65,7 +65,8 @@ class ApplyInPandasWithStatePythonRunner(
stateValueSchema: StructType,
override val pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String])
- extends BasePythonRunner[InType, OutType](funcs.map(_._1), evalType,
argOffsets, jobArtifactUUID)
+ extends BasePythonRunner[InType, OutType](
+ funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
with PythonArrowInput[InType]
with PythonArrowOutput[OutType] {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index 72e9c5210194..579b49604685 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -38,7 +38,7 @@ abstract class BaseArrowPythonRunner(
override val pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String])
extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch](
- funcs.map(_._1), evalType, argOffsets, jobArtifactUUID)
+ funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
with BasicPythonArrowInput
with BasicPythonArrowOutput {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
index f52b01b6646a..99a9e706c662 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
@@ -43,7 +43,7 @@ class ArrowPythonUDTFRunner(
jobArtifactUUID: Option[String])
extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch](
Seq(ChainedPythonFunctions(Seq(udtf.func))), evalType,
Array(argMetas.map(_.offset)),
- jobArtifactUUID)
+ jobArtifactUUID, pythonMetrics)
with BasicPythonArrowInput
with BasicPythonArrowOutput {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
index 5670cad67e7b..c5e86d010938 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
@@ -51,7 +51,7 @@ class CoGroupedArrowPythonRunner(
profiler: Option[String])
extends BasePythonRunner[
(Iterator[InternalRow], Iterator[InternalRow]), ColumnarBatch](
- funcs.map(_._1), evalType, argOffsets, jobArtifactUUID)
+ funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
with BasicPythonArrowOutput {
override val pythonExec: String =
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
index 4df6d821c014..bd22739613ee 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
@@ -24,6 +24,8 @@ trait PythonSQLMetrics { self: SparkPlan =>
protected val pythonMetrics: Map[String, SQLMetric] = {
PythonSQLMetrics.pythonSizeMetricsDesc.map { case (k, v) =>
k -> SQLMetrics.createSizeMetric(sparkContext, v)
+ } ++ PythonSQLMetrics.pythonTimingMetricsDesc.map { case (k, v) =>
+ k -> SQLMetrics.createTimingMetric(sparkContext, v)
} ++ PythonSQLMetrics.pythonOtherMetricsDesc.map { case (k, v) =>
k -> SQLMetrics.createMetric(sparkContext, v)
}
@@ -40,6 +42,14 @@ object PythonSQLMetrics {
)
}
+ val pythonTimingMetricsDesc: Map[String, String] = {
+ Map(
+ "pythonBootTime" -> "total time to start Python workers",
+ "pythonInitTime" -> "total time to initialize Python workers",
+ "pythonTotalTime" -> "total time to run Python workers"
+ )
+ }
+
val pythonOtherMetricsDesc: Map[String, String] = {
Map("pythonNumRowsReceived" -> "number of output rows")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index 87ff5a0ec433..92310aa755db 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -36,7 +36,7 @@ abstract class BasePythonUDFRunner(
pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String])
extends BasePythonRunner[Array[Byte], Array[Byte]](
- funcs.map(_._1), evalType, argOffsets, jobArtifactUUID) {
+ funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics) {
override val pythonExec: String =
SQLConf.get.pysparkWorkerPythonExecutable.getOrElse(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
index c5980012124f..641e49657ca9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasPythonRunner.scala
@@ -173,7 +173,8 @@ abstract class
TransformWithStateInPandasPythonBaseRunner[I](
groupingKeySchema: StructType,
batchTimestampMs: Option[Long],
eventTimeWatermarkForEviction: Option[Long])
- extends BasePythonRunner[I, ColumnarBatch](funcs.map(_._1), evalType,
argOffsets, jobArtifactUUID)
+ extends BasePythonRunner[I, ColumnarBatch](
+ funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics)
with PythonArrowInput[I]
with BasicPythonArrowOutput
with Logging {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
index 4b46331be107..2f44994c301b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
@@ -91,7 +91,10 @@ class PythonUDFSuite extends QueryTest with
SharedSparkSession {
val pythonSQLMetrics = List(
"data sent to Python workers",
"data returned from Python workers",
- "number of output rows")
+ "number of output rows",
+ "total time to initialize Python workers",
+ "total time to start Python workers",
+ "total time to run Python workers")
val df = base.groupBy(pythonTestUDF(base("a") + 1))
.agg(pythonTestUDF(pythonTestUDF(base("a") + 1)))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]