Tengfei Huang created SPARK-57593:
-------------------------------------

             Summary: Python UDF: instrument and byte-bound the pickle input 
batch
                 Key: SPARK-57593
                 URL: https://issues.apache.org/jira/browse/SPARK-57593
             Project: Spark
          Issue Type: Task
          Components: PySpark
    Affects Versions: 4.2.0
            Reporter: Tengfei Huang


For a regular (non-Arrow, pickle-serialized) Python UDF, `BatchEvalPythonExec` 
groups input rows into batches purely by row count 
(`spark.sql.execution.python.udf.maxRecordsPerBatch`, default 100) and pickles 
the whole batch into one contiguous byte array before sending it to the Python 
worker.

When the rows are wide (large strings, binary, or decimal columns), a single 
row-count batch can be very large, so the JVM must buffer one oversized pickled 
array on the heap -- amplified by the `ByteArrayOutputStream` power-of-two 
grow-copy and the final toByteArray copy (~2x transient). On memory-constrained 
executors this can drive the executor into an `OutOfMemoryError`, with no way 
to bound it other than lowering the row count for every UDF.

The Arrow Python UDF path already byte-bounds its batches (it checks 
arrowWriter.sizeInBytes() per row and slices before crossing the limit). The 
pickle path has no equivalent: it is bounded only by row count. To avoid memory 
issues crashing executor, we should also bound the batch by size for 
pickle-serialized UDF.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to