This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d14bed9ae422 [SPARK-55098][PYTHON] Vectorized UDFs with output batch
size control fail with memory leak
d14bed9ae422 is described below
commit d14bed9ae422630677e22ffe4253c1aaee970a45
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Wed Jan 21 07:16:21 2026 +0900
[SPARK-55098][PYTHON] Vectorized UDFs with output batch size control fail
with memory leak
### What changes were proposed in this pull request?
Fix a memory leak that when the output stream is stopped before EOS
Currently, memory is freed by `processor.close()` at EOS, but when python
node is followed by a `Limit` node, the `processor.close()` is not called at
the end of task and cause memory leak.
### Why are the changes needed?
```
import pyarrow as pa
spark.conf.set("spark.sql.execution.arrow.maxBytesPerOutputBatch", "3")
def get_size(iterator):
for batch in iterator:
if batch.num_rows > 0:
yield pa.RecordBatch.from_arrays([pa.array([batch.num_rows])],
names=['size'])
spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
```
fails with
```
26/01/20 17:45:48 ERROR BaseAllocator: Memory was leaked by query. Memory
leaked: (12)
Allocator(stdin reader for
/Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python)
0/12/12/9223372036854775807 (res/actual/peak/limit)
26/01/20 17:45:48 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.IllegalStateException: Memory was leaked by query. Memory leaked:
(12)
Allocator(stdin reader for
/Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python)
0/12/12/9223372036854775807 (res/actual/peak/limit)
at
org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:504)
at
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1(PythonArrowOutput.scala:81)
at
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.$anonfun$new$1$adapted(PythonArrowOutput.scala:77)
at
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:146)
at
org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:157)
at
org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:157)
at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:212)
at
org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:157)
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:146)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:199)
at org.apache.spark.scheduler.Task.run(Task.scala:147)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
26/01/20 17:45:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by
query. Memory leaked: (12)
Allocator(stdin reader for
/Users/ruifeng.zheng/.dev/miniconda3/envs/spark-dev-313/bin/python)
0/12/12/9223372036854775807 (res/actual/peak/limit)
```
If I remove the `limit(1)`, then
```
spark.range(10).mapInArrow(get_size, "size long").collect()
```
works as expected.
### Does this PR introduce _any_ user-facing change?
yes, bug-fix
after this fix
```
In [2]: spark.range(10).mapInArrow(get_size, "size long").limit(1).collect()
Out[2]: [Row(size=1)]
```
### How was this patch tested?
added tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #53867 from zhengruifeng/test_output_memory.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/tests/arrow/test_arrow_map.py | 10 ++++++++++
.../apache/spark/sql/execution/python/PythonArrowOutput.scala | 4 ++++
2 files changed, 14 insertions(+)
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_map.py
b/python/pyspark/sql/tests/arrow/test_arrow_map.py
index 497fd73b0be6..f9928859b1bb 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_map.py
@@ -55,6 +55,16 @@ class MapInArrowTestsMixin(object):
expected = df.collect()
self.assertEqual(actual, expected)
+ def test_map_in_arrow_with_limit(self):
+ def get_size(iterator):
+ for batch in iterator:
+ assert isinstance(batch, pa.RecordBatch)
+ if batch.num_rows > 0:
+ yield
pa.RecordBatch.from_arrays([pa.array([batch.num_rows])], names=["size"])
+
+ df = self.spark.range(100)
+ df.mapInArrow(get_size, "size long").limit(1).collect()
+
def test_multiple_columns(self):
data = [(1, "foo"), (2, None), (3, "bar"), (4, "bar")]
df = self.spark.createDataFrame(data, "a int, b string")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
index d677a9517a3f..38fd081a104f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
@@ -75,6 +75,9 @@ private[python] trait PythonArrowOutput[OUT <: AnyRef] {
self: BasePythonRunner[
private var processor: ArrowOutputProcessor = _
context.addTaskCompletionListener[Unit] { _ =>
+ if (processor != null) {
+ processor.close()
+ }
if (reader != null) {
reader.close(false)
}
@@ -250,6 +253,7 @@ abstract class BaseSliceArrowOutputProcessor(
prevVectors.foreach(_.close())
prevRoot.close()
}
+ super.close()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]