This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new afb4f822470a [SPARK-50752][PYTHON][SQL] Introduce configs for tuning 
Python UDF without Arrow
afb4f822470a is described below

commit afb4f822470a6576ab40047ee01b30d76cc4304f
Author: Jungtaek Lim <[email protected]>
AuthorDate: Wed Jan 8 17:23:01 2025 +0900

    [SPARK-50752][PYTHON][SQL] Introduce configs for tuning Python UDF without 
Arrow
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to introduce new configs for tuning Python UDF without 
Arrow.
    
    There are two major configurations for tuning in UDF:
    
    * batch size on serde of input/output (executor <-> python worker)
    * buffer size on writing to channel (executor -> python worker)
    
    The first one is hard-coded (100), and the second one is picked from more 
general config (network buffer), which effectively does not give a proper 
chance to tune.
    
    This PR enables users to tune the above twos, via
    
    * batch size: `park.sql.execution.python.udf.maxRecordsPerBatch`
    * buffer size: `spark.sql.execution.python.udf.buffer.size`
    
    ### Why are the changes needed?
    
    Explained in above.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, users will be able to tune for Python UDF without Arrow, with new 
configuration.
    
    ### How was this patch tested?
    
    New UTs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49397 from HeartSaVioR/SPARK-50752.
    
    Authored-by: Jungtaek Lim <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../org/apache/spark/api/python/PythonRunner.scala |  4 ++++
 python/pyspark/sql/tests/test_udf.py               | 27 ++++++++++++++++++++++
 python/pyspark/worker.py                           |  3 ++-
 .../org/apache/spark/sql/internal/SQLConf.scala    | 18 +++++++++++++++
 .../sql/execution/python/BatchEvalPythonExec.scala | 11 ++++++---
 .../execution/python/BatchEvalPythonUDTFExec.scala |  9 ++++++--
 .../sql/execution/python/PythonUDFRunner.scala     |  2 ++
 7 files changed, 68 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index a34cce980ae4..e3d10574419b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -129,6 +129,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   protected val pythonExec: String = funcs.head.funcs.head.pythonExec
   protected val pythonVer: String = funcs.head.funcs.head.pythonVer
 
+  protected val batchSizeForPythonUDF: Int = 100
+
   // WARN: Both configurations, 'spark.python.daemon.module' and 
'spark.python.worker.module' are
   // for very advanced users and they are experimental. This should be 
considered
   // as expert-only option, and shouldn't be used before knowing what it means 
exactly.
@@ -212,6 +214,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     if (faultHandlerEnabled) {
       envVars.put("PYTHON_FAULTHANDLER_DIR", 
BasePythonRunner.faultHandlerLogDir.toString)
     }
+    // allow the user to set the batch size for the BatchedSerializer on UDFs
+    envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
 
     envVars.put("SPARK_JOB_ARTIFACT_UUID", 
jobArtifactUUID.getOrElse("default"))
 
diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 819391389237..67d243cd2924 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -1229,6 +1229,33 @@ class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase):
         super(BaseUDFTestsMixin, cls).setUpClass()
         cls.spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", 
"false")
 
+    # We cannot check whether the batch size is effective or not. We just run 
the query with
+    # various batch size and see whether the query runs successfully, and the 
output is
+    # consistent across different batch sizes.
+    def test_udf_with_various_batch_size(self):
+        self.spark.catalog.registerFunction("twoArgs", lambda x, y: len(x) + 
y, IntegerType())
+        for batch_size in [1, 33, 1000, 2000]:
+            with 
self.sql_conf({"spark.sql.execution.python.udf.maxRecordsPerBatch": 
batch_size}):
+                df = self.spark.range(1000).selectExpr("twoArgs('test', id) AS 
ret").orderBy("ret")
+                rets = [x["ret"] for x in df.collect()]
+                self.assertEqual(rets, list(range(4, 1004)))
+
+    # We cannot check whether the buffer size is effective or not. We just run 
the query with
+    # various buffer size and see whether the query runs successfully, and the 
output is
+    # consistent across different batch sizes.
+    def test_udf_with_various_buffer_size(self):
+        self.spark.catalog.registerFunction("twoArgs", lambda x, y: len(x) + 
y, IntegerType())
+        for batch_size in [1, 33, 10000]:
+            with self.sql_conf({"spark.sql.execution.python.udf.buffer.size": 
batch_size}):
+                df = (
+                    self.spark.range(1000)
+                    .repartition(1)
+                    .selectExpr("twoArgs('test', id) AS ret")
+                    .orderBy("ret")
+                )
+                rets = [x["ret"] for x in df.collect()]
+                self.assertEqual(rets, list(range(4, 1004)))
+
 
 class UDFInitializationTests(unittest.TestCase):
     def tearDown(self):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 712f71d3861a..e799498cdd80 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1569,7 +1569,8 @@ def read_udfs(pickleSer, infile, eval_type):
                 arrow_cast,
             )
     else:
-        ser = BatchedSerializer(CPickleSerializer(), 100)
+        batch_size = int(os.environ.get("PYTHON_UDF_BATCH_SIZE", "100"))
+        ser = BatchedSerializer(CPickleSerializer(), batch_size)
 
     is_profiling = read_bool(infile)
     if is_profiling:
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7bc4051b45d4..875e9543c472 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3377,6 +3377,24 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val PYTHON_UDF_MAX_RECORDS_PER_BATCH =
+    buildConf("spark.sql.execution.python.udf.maxRecordsPerBatch")
+      .doc("When using Python UDFs, limit the maximum number of records that 
can be batched " +
+        "for serialization/deserialization.")
+      .version("4.0.0")
+      .intConf
+      .checkValue(_ > 0, "The value of 
spark.sql.execution.python.udf.maxRecordsPerBatch " +
+        "must be positive.")
+      .createWithDefault(100)
+
+  val PYTHON_UDF_BUFFER_SIZE =
+    buildConf("spark.sql.execution.python.udf.buffer.size")
+      .doc(
+        s"Same as `${BUFFER_SIZE.key}` but only applies to Python UDF 
executions. If it is not " +
+        s"set, the fallback is `${BUFFER_SIZE.key}`.")
+      .version("4.0.0")
+      .fallbackConf(BUFFER_SIZE)
+
   val PANDAS_UDF_BUFFER_SIZE =
     buildConf("spark.sql.execution.pandas.udf.buffer.size")
       .doc(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index e6958392cad4..28318a319b08 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.python.EvalPythonExec.ArgumentMetadata
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{StructField, StructType}
 
 /**
@@ -39,10 +40,12 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], 
resultAttrs: Seq[Attribute]
   private[this] val jobArtifactUUID = 
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
 
   override protected def evaluatorFactory: EvalPythonEvaluatorFactory = {
+    val batchSize = conf.getConf(SQLConf.PYTHON_UDF_MAX_RECORDS_PER_BATCH)
     new BatchEvalPythonEvaluatorFactory(
       child.output,
       udfs,
       output,
+      batchSize,
       pythonMetrics,
       jobArtifactUUID,
       conf.pythonUDFProfiler)
@@ -56,6 +59,7 @@ class BatchEvalPythonEvaluatorFactory(
     childOutput: Seq[Attribute],
     udfs: Seq[PythonUDF],
     output: Seq[Attribute],
+    batchSize: Int,
     pythonMetrics: Map[String, SQLMetric],
     jobArtifactUUID: Option[String],
     profiler: Option[String])
@@ -70,7 +74,7 @@ class BatchEvalPythonEvaluatorFactory(
     EvaluatePython.registerPicklers() // register pickler for Row
 
     // Input iterator to Python.
-    val inputIterator = BatchEvalPythonExec.getInputIterator(iter, schema)
+    val inputIterator = BatchEvalPythonExec.getInputIterator(iter, schema, 
batchSize)
 
     // Output iterator for results from Python.
     val outputIterator =
@@ -107,7 +111,8 @@ class BatchEvalPythonEvaluatorFactory(
 object BatchEvalPythonExec {
   def getInputIterator(
       iter: Iterator[InternalRow],
-      schema: StructType): Iterator[Array[Byte]] = {
+      schema: StructType,
+      batchSize: Int): Iterator[Array[Byte]] = {
     val dataTypes = schema.map(_.dataType)
     val needConversion = 
dataTypes.exists(EvaluatePython.needConversionInPython)
 
@@ -140,6 +145,6 @@ object BatchEvalPythonExec {
         }
         fields
       }
-    }.grouped(100).map(x => pickle.dumps(x.toArray))
+    }.grouped(batchSize).map(x => pickle.dumps(x.toArray))
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala
index 9eebd4ea7e79..c0dcb7781742 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala
@@ -23,8 +23,9 @@ import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.Unpickler
 
-import org.apache.spark.{JobArtifactSet, TaskContext}
+import org.apache.spark.{JobArtifactSet, SparkEnv, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, 
PythonWorkerUtils}
+import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.GenericArrayData
@@ -63,7 +64,8 @@ case class BatchEvalPythonUDTFExec(
     EvaluatePython.registerPicklers()  // register pickler for Row
 
     // Input iterator to Python.
-    val inputIterator = BatchEvalPythonExec.getInputIterator(iter, schema)
+    // For Python UDTF, we don't have a separate configuration for the batch 
size yet.
+    val inputIterator = BatchEvalPythonExec.getInputIterator(iter, schema, 100)
 
     // Output iterator for results from Python.
     val outputIterator =
@@ -101,6 +103,9 @@ class PythonUDTFRunner(
     Seq((ChainedPythonFunctions(Seq(udtf.func)), udtf.resultId.id)),
     PythonEvalType.SQL_TABLE_UDF, Array(argMetas.map(_.offset)), 
pythonMetrics, jobArtifactUUID) {
 
+  // Overriding here to NOT use the same value of UDF config in UDTF.
+  override val bufferSize: Int = SparkEnv.get.conf.get(BUFFER_SIZE)
+
   override protected def writeUDF(dataOut: DataOutputStream): Unit = {
     PythonUDTFRunner.writeUDTF(dataOut, udtf, argMetas)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index 92310aa755db..167e1fd8b0f0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -46,6 +46,8 @@ abstract class BasePythonUDFRunner(
 
   override val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
 
+  override val bufferSize: Int = 
SQLConf.get.getConf(SQLConf.PYTHON_UDF_BUFFER_SIZE)
+
   protected def writeUDF(dataOut: DataOutputStream): Unit
 
   protected override def newWriter(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to