Viacheslav Krot created SPARK-31386:
---------------------------------------
Summary: Reading broadcast in UDF raises MemoryError when
spark.executor.pyspark.memory is set
Key: SPARK-31386
URL: https://issues.apache.org/jira/browse/SPARK-31386
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 2.4.4
Environment: Spark 2.4.4 or AWS EMR
`pyspark --conf spark.executor.pyspark.memory=500m`
Reporter: Viacheslav Krot
Following code with udf causes MemoryError when `spark.executor.pyspark.memory`
is set
```
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
df = spark.createDataFrame([
('Alice', 10),
('Bob', 12)
], ['name', 'cnt'])
broadcast = spark.sparkContext.broadcast([1,2,3])
@udf(BooleanType())
def f(cnt):
return cnt < len(broadcast.value)
df.filter(f(df.cnt)).count()
```
Same code work well when spark.executor.pyspark.memory is not set.
The code by itself does not make any sense, just simplest code to reproduce the
bug.
Error:
```
20/04/08 13:16:50 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 6,
ip-172-31-32-201.us-east-2.compute.internal, executor 2):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):20/04/08 13:16:50 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 6,
ip-172-31-32-201.us-east-2.compute.internal, executor 2):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
line 377, in main process() File
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
line 372, in process serializer.dump_stream(func(split_index, iterator),
outfile) File
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
line 345, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream) File
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
line 141, in dump_stream for obj in iterator: File
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
line 334, in _batched for item in iterator: File "<string>", line 1, in
<lambda> File
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
line 85, in <lambda> return lambda *a: f(*a) File
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/util.py",
line 113, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, in
f File
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/broadcast.py",
line 148, in value self._value = self.load_from_path(self._path) File
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/broadcast.py",
line 124, in load_from_path with open(path, 'rb', 1 << 20) as f:MemoryError
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123) at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
pyspark command to launch:
`pyspark --conf spark.executor.pyspark.memory=500m`
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]