This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d14bed9ae422 [SPARK-55098][PYTHON] Vectorized UDFs with output batch 
size control fail with memory leak
d14bed9ae422 is described below

commit d14bed9ae422630677e22ffe4253c1aaee970a45
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Wed Jan 21 07:16:21 2026 +0900

    [SPARK-55098][PYTHON] Vectorized UDFs with output batch size control fail 
with memory leak
    
    ### What changes were proposed in this pull request?
    Fix a memory leak that when the output stream is stopped before EOS
    
    Currently, memory is freed by `processor.close()` at EOS, but when python 
node is followed by a `Limit` node, the `processor.close()` is not called at 
the end of task and cause memory leak.
    
    ### Why are the changes needed?
    
    ```
    import pyarrow as pa
    
    spark.conf.set("spark.sql.execution.arrow.maxBytesPerOutputBatch", "3")
    
    def get_size(iterator):
        for batch in iterator:
            if batch.num_rows > 0:
                yield pa.RecordBatch.from_arrays([pa.array([batch.num_rows])], 
names=['size'])
    
    spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
    ```
    
    fails with
    ```
    26/01/20 17:45:48 ERROR BaseAllocator: Memory was leaked by query. Memory 
leaked: (12)
    Allocator(stdin reader for 
/Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python) 
0/12/12/9223372036854775807 (res/actual/peak/limit)
    
    26/01/20 17:45:48 ERROR TaskContextImpl: Error in TaskCompletionListener
    java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: 
(12)
    Allocator(stdin reader for 
/Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python) 
0/12/12/9223372036854775807 (res/actual/peak/limit)
    
            at 
org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:504)
            at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1(PythonArrowOutput.scala:81)
            at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1$adapted(PythonArrowOutput.scala:77)
            at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:146)
            at 
org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:157)
            at 
org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:157)
            at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:212)
            at 
org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:157)
            at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:146)
            at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:199)
            at org.apache.spark.scheduler.Task.run(Task.scala:147)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
            at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
            at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
            at java.base/java.lang.Thread.run(Thread.java:1583)
    26/01/20 17:45:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by 
query. Memory leaked: (12)
    Allocator(stdin reader for 
/Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python) 
0/12/12/9223372036854775807 (res/actual/peak/limit)
    ```
    
    If I remove the `limit(1)`, then
    ```
    spark.range(10).mapInArrow(get_size, "size long").collect()
    ```
    works as expected.
    
    ### Does this PR introduce _any_ user-facing change?
    yes, bug-fix
    
    after this fix
    ```
    In [2]: spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
    Out[2]: [Row(size=1)]
    ```
    
    ### How was this patch tested?
    added tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #53867 from zhengruifeng/test_output_memory.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/tests/arrow/test_arrow_map.py               | 10 ++++++++++
 .../apache/spark/sql/execution/python/PythonArrowOutput.scala  |  4 ++++
 2 files changed, 14 insertions(+)

diff --git a/python/pyspark/sql/tests/arrow/test_arrow_map.py 
b/python/pyspark/sql/tests/arrow/test_arrow_map.py
index 497fd73b0be6..f9928859b1bb 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_map.py
@@ -55,6 +55,16 @@ class MapInArrowTestsMixin(object):
         expected = df.collect()
         self.assertEqual(actual, expected)
 
+    def test_map_in_arrow_with_limit(self):
+        def get_size(iterator):
+            for batch in iterator:
+                assert isinstance(batch, pa.RecordBatch)
+                if batch.num_rows > 0:
+                    yield 
pa.RecordBatch.from_arrays([pa.array([batch.num_rows])], names=["size"])
+
+        df = self.spark.range(100)
+        df.mapInArrow(get_size, "size long").limit(1).collect()
+
     def test_multiple_columns(self):
         data = [(1, "foo"), (2, None), (3, "bar"), (4, "bar")]
         df = self.spark.createDataFrame(data, "a int, b string")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
index d677a9517a3f..38fd081a104f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
@@ -75,6 +75,9 @@ private[python] trait PythonArrowOutput[OUT <: AnyRef] { 
self: BasePythonRunner[
       private var processor: ArrowOutputProcessor = _
 
       context.addTaskCompletionListener[Unit] { _ =>
+        if (processor != null) {
+          processor.close()
+        }
         if (reader != null) {
           reader.close(false)
         }
@@ -250,6 +253,7 @@ abstract class BaseSliceArrowOutputProcessor(
       prevVectors.foreach(_.close())
       prevRoot.close()
     }
+    super.close()
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to