This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2e83db4a17d6 [SPARK-55027][PYTHON] Move writeConf to PythonWorkerUtils
2e83db4a17d6 is described below
commit 2e83db4a17d65682af83784b592cd84ffe104281
Author: Tian Gao <[email protected]>
AuthorDate: Mon Jan 19 08:55:19 2026 +0800
[SPARK-55027][PYTHON] Move writeConf to PythonWorkerUtils
### What changes were proposed in this pull request?
Move `writeConf` to `PythonWorkerUtils` so it can be reused by other
components like `PythonPlannerRunner`.
### Why are the changes needed?
We need to pass some configs from `PythonPlannerRunner` in the future to
enable profiler. This is a prereq task.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Build is fine, rest is on CI.
### Was this patch authored or co-authored using generative AI tooling?
Yes, cursor (claude-4.5-opus-high)
Closes #53789 from gaogaotiantian/move-write-runner-conf.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../org/apache/spark/api/python/PythonRunner.scala | 19 ++-----------------
.../apache/spark/api/python/PythonWorkerUtils.scala | 11 +++++++++++
2 files changed, 13 insertions(+), 17 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 878f27961d5d..5f37e1f67b12 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -407,17 +407,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
*/
protected def writeCommand(dataOut: DataOutputStream): Unit
- /**
- * Writes worker configuration to the stream connected to the Python
worker.
- */
- protected def writeRunnerConf(dataOut: DataOutputStream): Unit = {
- dataOut.writeInt(runnerConf.size)
- for ((k, v) <- runnerConf) {
- PythonWorkerUtils.writeUTF(k, dataOut)
- PythonWorkerUtils.writeUTF(v, dataOut)
- }
- }
-
/**
* Writes input data to the stream connected to the Python worker.
* Returns true if any data was written to the stream, false if the input
is exhausted.
@@ -537,17 +526,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
}
}
val localProps = context.getLocalProperties.asScala
- dataOut.writeInt(localProps.size)
- localProps.foreach { case (k, v) =>
- PythonRDD.writeUTF(k, dataOut)
- PythonRDD.writeUTF(v, dataOut)
- }
+ PythonWorkerUtils.writeConf(localProps.toMap, dataOut)
PythonWorkerUtils.writeSparkFiles(jobArtifactUUID, pythonIncludes,
dataOut)
PythonWorkerUtils.writeBroadcasts(broadcastVars, worker, env, dataOut)
dataOut.writeInt(evalType)
- writeRunnerConf(dataOut)
+ PythonWorkerUtils.writeConf(runnerConf, dataOut)
writeCommand(dataOut)
dataOut.flush()
diff --git
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerUtils.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerUtils.scala
index 0a6def051a34..db26dc28420a 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerUtils.scala
@@ -153,6 +153,17 @@ private[spark] object PythonWorkerUtils extends Logging {
writeBytes(func.command.toArray, dataOut)
}
+ /**
+ * Write configuration in the form of key-value pairs.
+ */
+ def writeConf(conf: Map[String, String], dataOut: DataOutputStream): Unit = {
+ dataOut.writeInt(conf.size)
+ for ((k, v) <- conf) {
+ writeUTF(k, dataOut)
+ writeUTF(v, dataOut)
+ }
+ }
+
/**
* Read a string in UTF-8 charset.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]