This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new afb4f822470a [SPARK-50752][PYTHON][SQL] Introduce configs for tuning
Python UDF without Arrow
afb4f822470a is described below
commit afb4f822470a6576ab40047ee01b30d76cc4304f
Author: Jungtaek Lim <[email protected]>
AuthorDate: Wed Jan 8 17:23:01 2025 +0900
[SPARK-50752][PYTHON][SQL] Introduce configs for tuning Python UDF without
Arrow
### What changes were proposed in this pull request?
This PR proposes to introduce new configs for tuning Python UDF without
Arrow.
There are two major configurations for tuning in UDF:
* batch size on serde of input/output (executor <-> python worker)
* buffer size on writing to channel (executor -> python worker)
The first one is hard-coded (100), and the second one is picked from more
general config (network buffer), which effectively does not give a proper
chance to tune.
This PR enables users to tune the above twos, via
* batch size: `park.sql.execution.python.udf.maxRecordsPerBatch`
* buffer size: `spark.sql.execution.python.udf.buffer.size`
### Why are the changes needed?
Explained in above.
### Does this PR introduce _any_ user-facing change?
Yes, users will be able to tune for Python UDF without Arrow, with new
configuration.
### How was this patch tested?
New UTs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49397 from HeartSaVioR/SPARK-50752.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/api/python/PythonRunner.scala | 4 ++++
python/pyspark/sql/tests/test_udf.py | 27 ++++++++++++++++++++++
python/pyspark/worker.py | 3 ++-
.../org/apache/spark/sql/internal/SQLConf.scala | 18 +++++++++++++++
.../sql/execution/python/BatchEvalPythonExec.scala | 11 ++++++---
.../execution/python/BatchEvalPythonUDTFExec.scala | 9 ++++++--
.../sql/execution/python/PythonUDFRunner.scala | 2 ++
7 files changed, 68 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index a34cce980ae4..e3d10574419b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -129,6 +129,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
protected val pythonExec: String = funcs.head.funcs.head.pythonExec
protected val pythonVer: String = funcs.head.funcs.head.pythonVer
+ protected val batchSizeForPythonUDF: Int = 100
+
// WARN: Both configurations, 'spark.python.daemon.module' and
'spark.python.worker.module' are
// for very advanced users and they are experimental. This should be
considered
// as expert-only option, and shouldn't be used before knowing what it means
exactly.
@@ -212,6 +214,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
if (faultHandlerEnabled) {
envVars.put("PYTHON_FAULTHANDLER_DIR",
BasePythonRunner.faultHandlerLogDir.toString)
}
+ // allow the user to set the batch size for the BatchedSerializer on UDFs
+ envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
envVars.put("SPARK_JOB_ARTIFACT_UUID",
jobArtifactUUID.getOrElse("default"))
diff --git a/python/pyspark/sql/tests/test_udf.py
b/python/pyspark/sql/tests/test_udf.py
index 819391389237..67d243cd2924 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -1229,6 +1229,33 @@ class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase):
super(BaseUDFTestsMixin, cls).setUpClass()
cls.spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled",
"false")
+ # We cannot check whether the batch size is effective or not. We just run
the query with
+ # various batch size and see whether the query runs successfully, and the
output is
+ # consistent across different batch sizes.
+ def test_udf_with_various_batch_size(self):
+ self.spark.catalog.registerFunction("twoArgs", lambda x, y: len(x) +
y, IntegerType())
+ for batch_size in [1, 33, 1000, 2000]:
+ with
self.sql_conf({"spark.sql.execution.python.udf.maxRecordsPerBatch":
batch_size}):
+ df = self.spark.range(1000).selectExpr("twoArgs('test', id) AS
ret").orderBy("ret")
+ rets = [x["ret"] for x in df.collect()]
+ self.assertEqual(rets, list(range(4, 1004)))
+
+ # We cannot check whether the buffer size is effective or not. We just run
the query with
+ # various buffer size and see whether the query runs successfully, and the
output is
+ # consistent across different batch sizes.
+ def test_udf_with_various_buffer_size(self):
+ self.spark.catalog.registerFunction("twoArgs", lambda x, y: len(x) +
y, IntegerType())
+ for batch_size in [1, 33, 10000]:
+ with self.sql_conf({"spark.sql.execution.python.udf.buffer.size":
batch_size}):
+ df = (
+ self.spark.range(1000)
+ .repartition(1)
+ .selectExpr("twoArgs('test', id) AS ret")
+ .orderBy("ret")
+ )
+ rets = [x["ret"] for x in df.collect()]
+ self.assertEqual(rets, list(range(4, 1004)))
+
class UDFInitializationTests(unittest.TestCase):
def tearDown(self):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 712f71d3861a..e799498cdd80 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1569,7 +1569,8 @@ def read_udfs(pickleSer, infile, eval_type):
arrow_cast,
)
else:
- ser = BatchedSerializer(CPickleSerializer(), 100)
+ batch_size = int(os.environ.get("PYTHON_UDF_BATCH_SIZE", "100"))
+ ser = BatchedSerializer(CPickleSerializer(), batch_size)
is_profiling = read_bool(infile)
if is_profiling:
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7bc4051b45d4..875e9543c472 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3377,6 +3377,24 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val PYTHON_UDF_MAX_RECORDS_PER_BATCH =
+ buildConf("spark.sql.execution.python.udf.maxRecordsPerBatch")
+ .doc("When using Python UDFs, limit the maximum number of records that
can be batched " +
+ "for serialization/deserialization.")
+ .version("4.0.0")
+ .intConf
+ .checkValue(_ > 0, "The value of
spark.sql.execution.python.udf.maxRecordsPerBatch " +
+ "must be positive.")
+ .createWithDefault(100)
+
+ val PYTHON_UDF_BUFFER_SIZE =
+ buildConf("spark.sql.execution.python.udf.buffer.size")
+ .doc(
+ s"Same as `${BUFFER_SIZE.key}` but only applies to Python UDF
executions. If it is not " +
+ s"set, the fallback is `${BUFFER_SIZE.key}`.")
+ .version("4.0.0")
+ .fallbackConf(BUFFER_SIZE)
+
val PANDAS_UDF_BUFFER_SIZE =
buildConf("spark.sql.execution.pandas.udf.buffer.size")
.doc(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index e6958392cad4..28318a319b08 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.python.EvalPythonExec.ArgumentMetadata
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StructField, StructType}
/**
@@ -39,10 +40,12 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF],
resultAttrs: Seq[Attribute]
private[this] val jobArtifactUUID =
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
override protected def evaluatorFactory: EvalPythonEvaluatorFactory = {
+ val batchSize = conf.getConf(SQLConf.PYTHON_UDF_MAX_RECORDS_PER_BATCH)
new BatchEvalPythonEvaluatorFactory(
child.output,
udfs,
output,
+ batchSize,
pythonMetrics,
jobArtifactUUID,
conf.pythonUDFProfiler)
@@ -56,6 +59,7 @@ class BatchEvalPythonEvaluatorFactory(
childOutput: Seq[Attribute],
udfs: Seq[PythonUDF],
output: Seq[Attribute],
+ batchSize: Int,
pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String],
profiler: Option[String])
@@ -70,7 +74,7 @@ class BatchEvalPythonEvaluatorFactory(
EvaluatePython.registerPicklers() // register pickler for Row
// Input iterator to Python.
- val inputIterator = BatchEvalPythonExec.getInputIterator(iter, schema)
+ val inputIterator = BatchEvalPythonExec.getInputIterator(iter, schema,
batchSize)
// Output iterator for results from Python.
val outputIterator =
@@ -107,7 +111,8 @@ class BatchEvalPythonEvaluatorFactory(
object BatchEvalPythonExec {
def getInputIterator(
iter: Iterator[InternalRow],
- schema: StructType): Iterator[Array[Byte]] = {
+ schema: StructType,
+ batchSize: Int): Iterator[Array[Byte]] = {
val dataTypes = schema.map(_.dataType)
val needConversion =
dataTypes.exists(EvaluatePython.needConversionInPython)
@@ -140,6 +145,6 @@ object BatchEvalPythonExec {
}
fields
}
- }.grouped(100).map(x => pickle.dumps(x.toArray))
+ }.grouped(batchSize).map(x => pickle.dumps(x.toArray))
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala
index 9eebd4ea7e79..c0dcb7781742 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala
@@ -23,8 +23,9 @@ import scala.jdk.CollectionConverters._
import net.razorvine.pickle.Unpickler
-import org.apache.spark.{JobArtifactSet, TaskContext}
+import org.apache.spark.{JobArtifactSet, SparkEnv, TaskContext}
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType,
PythonWorkerUtils}
+import org.apache.spark.internal.config.BUFFER_SIZE
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.GenericArrayData
@@ -63,7 +64,8 @@ case class BatchEvalPythonUDTFExec(
EvaluatePython.registerPicklers() // register pickler for Row
// Input iterator to Python.
- val inputIterator = BatchEvalPythonExec.getInputIterator(iter, schema)
+ // For Python UDTF, we don't have a separate configuration for the batch
size yet.
+ val inputIterator = BatchEvalPythonExec.getInputIterator(iter, schema, 100)
// Output iterator for results from Python.
val outputIterator =
@@ -101,6 +103,9 @@ class PythonUDTFRunner(
Seq((ChainedPythonFunctions(Seq(udtf.func)), udtf.resultId.id)),
PythonEvalType.SQL_TABLE_UDF, Array(argMetas.map(_.offset)),
pythonMetrics, jobArtifactUUID) {
+ // Overriding here to NOT use the same value of UDF config in UDTF.
+ override val bufferSize: Int = SparkEnv.get.conf.get(BUFFER_SIZE)
+
override protected def writeUDF(dataOut: DataOutputStream): Unit = {
PythonUDTFRunner.writeUDTF(dataOut, udtf, argMetas)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index 92310aa755db..167e1fd8b0f0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -46,6 +46,8 @@ abstract class BasePythonUDFRunner(
override val faultHandlerEnabled: Boolean =
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
+ override val bufferSize: Int =
SQLConf.get.getConf(SQLConf.PYTHON_UDF_BUFFER_SIZE)
+
protected def writeUDF(dataOut: DataOutputStream): Unit
protected override def newWriter(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]