[ 
https://issues.apache.org/jira/browse/SPARK-55098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruifeng Zheng updated SPARK-55098:
----------------------------------
    Description: 
{code:java}
from pyspark.sql.functions import udf, pandas_udf, arrow_udf
from pyspark.sql import functions as sf

import pyarrow as pa

spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "-1")
spark.conf.set("spark.sql.execution.arrow.maxBytesPerOutputBatch", "3")


def get_size(iterator):
    for batch in iterator:
        if batch.num_rows > 0:
            yield pa.RecordBatch.from_arrays([pa.array([batch.num_rows])], 
names=['size'])

spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
 {code}
{{}}
this query fails with
{code:java}
 SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 
times, most recent failure: Lost task 3.3 in stage 0.0 (TID 12) (10.68.161.10 
executor 0): org.apache.spark.util.TaskCompletionListenerException: Memory was 
leaked by query. Memory leaked: (12)
Allocator(stdin reader for /databricks/python/bin/python) 
0/12/12/9223372036854775807 (res/actual/peak/limit){code}
{{}}
if we remove the limit(1) , then
spark.range(10).mapInArrow(get_size, "size long").collect()
works as expected

> Vectorized UDFs with output batch control fail with memory leak
> ---------------------------------------------------------------
>
>                 Key: SPARK-55098
>                 URL: https://issues.apache.org/jira/browse/SPARK-55098
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 4.2.0, 4.1.1
>            Reporter: Ruifeng Zheng
>            Priority: Critical
>
> {code:java}
> from pyspark.sql.functions import udf, pandas_udf, arrow_udf
> from pyspark.sql import functions as sf
> import pyarrow as pa
> spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "-1")
> spark.conf.set("spark.sql.execution.arrow.maxBytesPerOutputBatch", "3")
> def get_size(iterator):
>     for batch in iterator:
>         if batch.num_rows > 0:
>             yield pa.RecordBatch.from_arrays([pa.array([batch.num_rows])], 
> names=['size'])
> spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
>  {code}
> {{}}
> this query fails with
> {code:java}
>  SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 
> 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 12) 
> (10.68.161.10 executor 0): 
> org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by 
> query. Memory leaked: (12)
> Allocator(stdin reader for /databricks/python/bin/python) 
> 0/12/12/9223372036854775807 (res/actual/peak/limit){code}
> {{}}
> if we remove the limit(1) , then
> spark.range(10).mapInArrow(get_size, "size long").collect()
> works as expected



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to