Tengfei Huang created SPARK-57593:
-------------------------------------
Summary: Python UDF: instrument and byte-bound the pickle input
batch
Key: SPARK-57593
URL: https://issues.apache.org/jira/browse/SPARK-57593
Project: Spark
Issue Type: Task
Components: PySpark
Affects Versions: 4.2.0
Reporter: Tengfei Huang
For a regular (non-Arrow, pickle-serialized) Python UDF, `BatchEvalPythonExec`
groups input rows into batches purely by row count
(`spark.sql.execution.python.udf.maxRecordsPerBatch`, default 100) and pickles
the whole batch into one contiguous byte array before sending it to the Python
worker.
When the rows are wide (large strings, binary, or decimal columns), a single
row-count batch can be very large, so the JVM must buffer one oversized pickled
array on the heap -- amplified by the `ByteArrayOutputStream` power-of-two
grow-copy and the final toByteArray copy (~2x transient). On memory-constrained
executors this can drive the executor into an `OutOfMemoryError`, with no way
to bound it other than lowering the row count for every UDF.
The Arrow Python UDF path already byte-bounds its batches (it checks
arrowWriter.sizeInBytes() per row and slices before crossing the limit). The
pickle path has no equivalent: it is bounded only by row count. To avoid memory
issues crashing executor, we should also bound the batch by size for
pickle-serialized UDF.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]