[
https://issues.apache.org/jira/browse/SPARK-55098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ruifeng Zheng updated SPARK-55098:
----------------------------------
Description:
{code:java}
from pyspark.sql.functions import udf, pandas_udf, arrow_udf
from pyspark.sql import functions as sf
import pyarrow as pa
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "-1")
spark.conf.set("spark.sql.execution.arrow.maxBytesPerOutputBatch", "3")
def get_size(iterator):
for batch in iterator:
if batch.num_rows > 0:
yield pa.RecordBatch.from_arrays([pa.array([batch.num_rows])],
names=['size'])
spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
{code}
{{}}
this query fails with
{code:java}
SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4
times, most recent failure: Lost task 3.3 in stage 0.0 (TID 12) (10.68.161.10
executor 0): org.apache.spark.util.TaskCompletionListenerException: Memory was
leaked by query. Memory leaked: (12)
Allocator(stdin reader for /databricks/python/bin/python)
0/12/12/9223372036854775807 (res/actual/peak/limit){code}
{{}}
if we remove the limit(1) , then
spark.range(10).mapInArrow(get_size, "size long").collect()
works as expected
> Vectorized UDFs with output batch control fail with memory leak
> ---------------------------------------------------------------
>
> Key: SPARK-55098
> URL: https://issues.apache.org/jira/browse/SPARK-55098
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 4.2.0, 4.1.1
> Reporter: Ruifeng Zheng
> Priority: Critical
>
> {code:java}
> from pyspark.sql.functions import udf, pandas_udf, arrow_udf
> from pyspark.sql import functions as sf
> import pyarrow as pa
> spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "-1")
> spark.conf.set("spark.sql.execution.arrow.maxBytesPerOutputBatch", "3")
> def get_size(iterator):
> for batch in iterator:
> if batch.num_rows > 0:
> yield pa.RecordBatch.from_arrays([pa.array([batch.num_rows])],
> names=['size'])
> spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
> {code}
> {{}}
> this query fails with
> {code:java}
> SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed
> 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 12)
> (10.68.161.10 executor 0):
> org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by
> query. Memory leaked: (12)
> Allocator(stdin reader for /databricks/python/bin/python)
> 0/12/12/9223372036854775807 (res/actual/peak/limit){code}
> {{}}
> if we remove the limit(1) , then
> spark.range(10).mapInArrow(get_size, "size long").collect()
> works as expected
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]