Viacheslav Krot created SPARK-31386:
---------------------------------------

             Summary: Reading broadcast in UDF raises MemoryError when 
spark.executor.pyspark.memory is set
                 Key: SPARK-31386
                 URL: https://issues.apache.org/jira/browse/SPARK-31386
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.4.4
         Environment: Spark 2.4.4 or AWS EMR

`pyspark --conf spark.executor.pyspark.memory=500m`
            Reporter: Viacheslav Krot


Following code with udf causes MemoryError when `spark.executor.pyspark.memory` 
is set

```

from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf

df = spark.createDataFrame([
  ('Alice', 10),
  ('Bob', 12)
], ['name', 'cnt'])

broadcast = spark.sparkContext.broadcast([1,2,3])

@udf(BooleanType())
def f(cnt):
  return cnt < len(broadcast.value)

df.filter(f(df.cnt)).count()

```

Same code work well when spark.executor.pyspark.memory is not set. 

The code by itself does not make any sense, just simplest code to reproduce the 
bug.

 

Error:

```

20/04/08 13:16:50 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 6, 
ip-172-31-32-201.us-east-2.compute.internal, executor 2): 
org.apache.spark.api.python.PythonException: Traceback (most recent call 
last):20/04/08 13:16:50 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 6, 
ip-172-31-32-201.us-east-2.compute.internal, executor 2): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
 File 
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
 line 377, in main    process()  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
 line 372, in process    serializer.dump_stream(func(split_index, iterator), 
outfile)  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
 line 345, in dump_stream    
self.serializer.dump_stream(self._batched(iterator), stream)  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
 line 141, in dump_stream    for obj in iterator:  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
 line 334, in _batched    for item in iterator:  File "<string>", line 1, in 
<lambda>  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
 line 85, in <lambda>    return lambda *a: f(*a)  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/util.py",
 line 113, in wrapper    return f(*args, **kwargs)  File "<stdin>", line 3, in 
f  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/broadcast.py",
 line 148, in value    self._value = self.load_from_path(self._path)  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/broadcast.py",
 line 124, in load_from_path    with open(path, 'rb', 1 << 20) as f:MemoryError
 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
 at 
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
 at 
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) 
at org.apache.spark.scheduler.Task.run(Task.scala:123) at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)

```

pyspark command to launch:

`pyspark --conf spark.executor.pyspark.memory=500m`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to