[ 
https://issues.apache.org/jira/browse/SPARK-55098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-55098:
-----------------------------------
    Labels: pull-request-available  (was: )

> Vectorized UDFs with output batch control fail with memory leak
> ---------------------------------------------------------------
>
>                 Key: SPARK-55098
>                 URL: https://issues.apache.org/jira/browse/SPARK-55098
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 4.2.0, 4.1.1
>            Reporter: Ruifeng Zheng
>            Priority: Critical
>              Labels: pull-request-available
>
> {code:java}
> import pyarrow as pa
> spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "-1")
> spark.conf.set("spark.sql.execution.arrow.maxBytesPerOutputBatch", "3")
> def get_size(iterator):
>     for batch in iterator:
>         if batch.num_rows > 0:
>             yield pa.RecordBatch.from_arrays([pa.array([batch.num_rows])], 
> names=['size'])
> spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
>  {code}
> this query fails with
> {code:java}
>  SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 
> 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 12) 
> (10.68.161.10 executor 0): 
> org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by 
> query. Memory leaked: (12)
> Allocator(stdin reader for /databricks/python/bin/python) 
> 0/12/12/9223372036854775807 (res/actual/peak/limit){code}
> if we remove the {{limit(1)}} , then
> {code:java}
>  spark.range(10).mapInArrow(get_size, "size long").collect(){code}
> works as expected



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to