This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e9f390da288a [SPARK-54223][PYTHON] Add task context and data metrics
to Python runner logs
e9f390da288a is described below
commit e9f390da288ae5b30316e8568fd10d13619fb863
Author: mariaselvam.nishanth <[email protected]>
AuthorDate: Thu Nov 27 13:14:24 2025 +0900
[SPARK-54223][PYTHON] Add task context and data metrics to Python runner
logs
### What changes were proposed in this pull request?
Currently, the log messages in PythonRunner and related Python execution
classes do not include Spark task context information during Python UDF
execution.
This makes it harder to correlate Python worker timing metrics and data
processing statistics with the specific Spark tasks that executed the UDFs,
especially when debugging performance issues or data skew in production
environments.
This improvement adds task context details along with data processing
metrics to the log statements in PythonRunner and PythonUDFRunner classes to
enhance traceability and debugging of Python UDF execution.
**Current Behaviour**
When examining executor logs, there is a disconnect between task execution
logs and Python runner logs:
```
INFO PythonRunner: Times: total = 2529, boot = 478, init = 31, finish = 2020
❌ NO TASK CONTEXT - Cannot correlate with task 4188
❌ NO DATA METRICS - Cannot see records processed or data size
```
**Expected Behaviour**
After this enhancement, logs include task context information and data
metrics:
```
INFO PythonRunner: Starting Python task execution (Stage 32, Attempt 0) -
task 83.0 in stage 32 (TID 4188)
PythonRunner: Times: total = 2529, boot = 478, init = 31, finish = 2020 -
Records: 10000, Data: 2.45 MB - task 83.0 in stage 32 (TID 4188)
✅ INCLUDES TASK CONTEXT - Easy to correlate with task 4188
✅ INCLUDES DATA METRICS - Shows 10000 records processed, 2.45 MB data
```
### Why are the changes needed?
**Enable seamless correlation between task execution and Python UDF
operations:**
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
**Run existing test suite:**
```bash
./build/mvn -pl core -am test
-DwildcardSuites=org.apache.spark.deploy.PythonRunnerSuite
```
**Result:**
```
Run completed in 1 second, 626 milliseconds.
Total number of tests run: 3
Suites: completed 2, aborted 0
Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
[INFO]
------------------------------------------------------------------------
[INFO] Reactor Summary for Spark Project Parent POM 4.2.0-SNAPSHOT:
[INFO]
[INFO] Spark Project Parent POM ........................... SUCCESS [
1.554 s]
[INFO] Spark Project Tags ................................. SUCCESS [
1.885 s]
[INFO] Spark Project Common Java Utils .................... SUCCESS [
2.938 s]
[INFO] Spark Project Common Utils ......................... SUCCESS [
10.154 s]
[INFO] Spark Project Local DB ............................. SUCCESS [
7.751 s]
[INFO] Spark Project Networking ........................... SUCCESS [01:02
min]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [
11.541 s]
[INFO] Spark Project Variant .............................. SUCCESS [
2.006 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [
7.967 s]
[INFO] Spark Project Launcher ............................. SUCCESS [
3.467 s]
[INFO] Spark Project Core ................................. SUCCESS [02:31
min]
[INFO]
------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO]
------------------------------------------------------------------------
[INFO] Total time: 04:23 min
[INFO] Finished at: 2025-11-07T10:49:09+05:30
[INFO]
------------------------------------------------------------------------
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52931 from Nishanth28/feature/pythonrunner-task-context-logs.
Authored-by: mariaselvam.nishanth <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/api/python/PythonRunner.scala | 81 ++++++++++++++++------
.../sql/execution/python/PythonArrowOutput.scala | 9 +++
.../sql/execution/python/PythonUDFRunner.scala | 2 +
3 files changed, 69 insertions(+), 23 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 7f1dc7fc86fc..81c4a5f4e811 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -32,8 +32,9 @@ import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.api.python.PythonFunction.PythonAccumulator
-import org.apache.spark.internal.{Logging, LogKeys, MessageWithContext}
-import org.apache.spark.internal.LogKeys.TASK_NAME
+import org.apache.spark.internal.{Logging, MessageWithContext}
+import org.apache.spark.internal.LogKeys
+import org.apache.spark.internal.LogKeys.{COUNT, PYTHON_WORKER_IDLE_TIMEOUT,
SIZE, TASK_NAME, TIME, TOTAL_TIME}
import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES}
import org.apache.spark.internal.config.Python._
import org.apache.spark.rdd.InputFileBlockHolder
@@ -134,6 +135,15 @@ private[spark] object BasePythonRunner extends Logging {
} else None
}
+ /**
+ * Creates a task identifier string for logging following Spark's standard
format.
+ * Format: "task <partition>.<attempt> in stage <stageId> (TID
<taskAttemptId>)"
+ */
+ private[spark] def taskIdentifier(context: TaskContext): String = {
+ s"task ${context.partitionId()}.${context.attemptNumber()} in stage
${context.stageId()} " +
+ s"(TID ${context.taskAttemptId()})"
+ }
+
private[spark] def pythonWorkerStatusMessageWithContext(
handle: Option[ProcessHandle],
worker: PythonWorker,
@@ -262,6 +272,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
val startTime = System.currentTimeMillis
val env = SparkEnv.get
+ // Log task context information at the start of computation
+ logInfo(log"Starting Python task execution - ${MDC(TASK_NAME,
taskIdentifier(context))}")
+
// Get the executor cores and pyspark memory, they are passed via the
local properties when
// the user specified them in a ResourceProfile.
val execCoresProp =
Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
@@ -340,7 +353,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
// Return an iterator that read lines from the process's stdout
val dataIn = new DataInputStream(new BufferedInputStream(
new ReaderInputStream(worker, writer, handle,
- faultHandlerEnabled, idleTimeoutSeconds, killOnIdleTimeout),
+ faultHandlerEnabled, idleTimeoutSeconds, killOnIdleTimeout, context),
bufferSize))
val stdoutIterator = newReaderIterator(
dataIn, writer, startTime, env, worker, handle.map(_.pid.toInt),
releasedOrClosed, context)
@@ -591,6 +604,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private var nextObj: OUT = _
private var eos = false
+ // Track batches and data size for logging
+ protected var batchesProcessed: Long = 0
+ protected var totalDataReceived: Long = 0
+
override def hasNext: Boolean = nextObj != null || {
if (!eos) {
nextObj = read()
@@ -626,10 +643,21 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
val init = initTime - bootTime
val finish = finishTime - initTime
val total = finishTime - startTime
- logInfo(log"Times: total = ${MDC(LogKeys.TOTAL_TIME, total)}, " +
- log"boot = ${MDC(LogKeys.BOOT_TIME, boot)}, " +
- log"init = ${MDC(LogKeys.INIT_TIME, init)}, " +
- log"finish = ${MDC(LogKeys.FINISH_TIME, finish)}")
+
+ // Format data size for readability
+ val dataKB = totalDataReceived / 1024.0
+ val dataMB = dataKB / 1024.0
+ val dataStr = if (dataMB >= 1.0) {
+ f"$dataMB%.2f MB"
+ } else {
+ f"$dataKB%.2f KB"
+ }
+
+ logInfo(log"Times: total = ${MDC(TOTAL_TIME, total)}, " +
+ log"boot = ${MDC(TIME, boot)}, init = ${MDC(TIME, init)}, " +
+ log"finish = ${MDC(TIME, finish)} - " +
+ log"Batches: ${MDC(COUNT, batchesProcessed)}, Data: ${MDC(SIZE,
dataStr)} - " +
+ log"${MDC(TASK_NAME, taskIdentifier(context))}")
metrics.get("pythonBootTime").foreach(_.add(boot))
metrics.get("pythonInitTime").foreach(_.add(init))
metrics.get("pythonTotalTime").foreach(_.add(total))
@@ -666,8 +694,11 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
throw new
TaskKilledException(context.getKillReason().getOrElse("unknown reason"))
case e: Exception if writer.exception.isDefined =>
- logError("Python worker exited unexpectedly (crashed)", e)
- logError("This may have been caused by a prior exception:",
writer.exception.get)
+ logError(log"Python worker exited unexpectedly (crashed) - " +
+ log"${MDC(TASK_NAME, taskIdentifier(context))}", e)
+ logError(log"This may have been caused by a prior exception - " +
+ log"${MDC(TASK_NAME, taskIdentifier(context))}",
+ writer.exception.get)
throw writer.exception.get
case e: IOException if !faultHandlerEnabled =>
@@ -709,16 +740,14 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
Thread.sleep(taskKillTimeout)
if (!context.isCompleted()) {
try {
- // Mimic the task name used in `Executor` to help the user find
out the task to blame.
- val taskName =
s"${context.partitionId()}.${context.attemptNumber()} " +
- s"in stage ${context.stageId()} (TID ${context.taskAttemptId()})"
- logWarning(log"Incomplete task ${MDC(TASK_NAME, taskName)} " +
- log"interrupted: Attempting to kill Python Worker")
+ logWarning(log"Incomplete task interrupted: Attempting to kill
Python Worker - " +
+ log"${MDC(TASK_NAME, taskIdentifier(context))}")
env.destroyPythonWorker(
pythonExec, workerModule, daemonModule, envVars.asScala.toMap,
worker)
} catch {
case e: Exception =>
- logError("Exception when trying to kill worker", e)
+ logError(log"Exception when trying to kill worker - " +
+ log"${MDC(TASK_NAME, taskIdentifier(context))}", e)
}
}
}
@@ -742,7 +771,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
handle: Option[ProcessHandle],
faultHandlerEnabled: Boolean,
idleTimeoutSeconds: Long,
- killOnIdleTimeout: Boolean) extends InputStream {
+ killOnIdleTimeout: Boolean,
+ context: TaskContext) extends InputStream {
private[this] var writerIfbhThreadLocalValue: Object = null
private[this] val temp = new Array[Byte](1)
private[this] val bufferStream = new DirectByteBufferOutputStream()
@@ -818,15 +848,17 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
} else {
logWarning(
log"Idle timeout reached for Python worker (timeout: " +
- log"${MDC(LogKeys.PYTHON_WORKER_IDLE_TIMEOUT,
idleTimeoutSeconds)} seconds). " +
- log"No data received from the worker process: " +
- pythonWorkerStatusMessageWithContext(handle, worker, hasInput ||
buffer.hasRemaining))
+ log"${MDC(PYTHON_WORKER_IDLE_TIMEOUT, idleTimeoutSeconds)}
seconds). " +
+ log"No data received from the worker process - " +
+ pythonWorkerStatusMessageWithContext(
+ handle, worker, hasInput || buffer.hasRemaining) +
+ log" - ${MDC(TASK_NAME, taskIdentifier(context))}")
if (killOnIdleTimeout) {
handle.foreach { handle =>
if (handle.isAlive) {
- logWarning(
- log"Terminating Python worker process due to idle timeout
(timeout: " +
- log"${MDC(LogKeys.PYTHON_WORKER_IDLE_TIMEOUT,
idleTimeoutSeconds)} seconds)")
+ logWarning(log"Terminating Python worker process due to idle
timeout " +
+ log"(timeout: ${MDC(PYTHON_WORKER_IDLE_TIMEOUT,
idleTimeoutSeconds)} " +
+ log"seconds) - ${MDC(TASK_NAME, taskIdentifier(context))}")
pythonWorkerKilled = handle.destroy()
}
}
@@ -1021,7 +1053,10 @@ private[spark] class PythonRunner(
try {
stream.readInt() match {
case length if length >= 0 =>
- PythonWorkerUtils.readBytes(length, stream)
+ val data = PythonWorkerUtils.readBytes(length, stream)
+ batchesProcessed += 1
+ totalDataReceived += length
+ data
case SpecialLengths.TIMING_DATA =>
handleTimingData()
read()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
index 018619a5207d..d677a9517a3f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
@@ -88,6 +88,14 @@ private[python] trait PythonArrowOutput[OUT <: AnyRef] {
self: BasePythonRunner[
super.handleEndOfDataSection()
}
+ protected override def handleTimingData(): Unit = {
+ // Get data size from pythonMetrics which is already being tracked
+ totalDataReceived = pythonMetrics.get("pythonDataReceived")
+ .map(_.value)
+ .getOrElse(0L)
+ super.handleTimingData()
+ }
+
protected override def read(): OUT = {
if (writer.exception.isDefined) {
throw writer.exception.get
@@ -96,6 +104,7 @@ private[python] trait PythonArrowOutput[OUT <: AnyRef] {
self: BasePythonRunner[
if (reader != null && batchLoaded) {
batchLoaded = processor.loadBatch()
if (batchLoaded) {
+ batchesProcessed += 1
val batch = processor.produceBatch()
deserializeColumnarBatch(batch, schema)
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index 759aa998832d..aae87bd94834 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -115,6 +115,8 @@ abstract class BasePythonUDFRunner(
case length if length >= 0 =>
val obj = PythonWorkerUtils.readBytes(length, stream)
pythonMetrics("pythonDataReceived") += length
+ batchesProcessed += 1
+ totalDataReceived += length
obj
case SpecialLengths.TIMING_DATA =>
handleTimingData()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]