[ https://issues.apache.org/jira/browse/SPARK-31386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17080257#comment-17080257 ]
JinxinTang edited comment on SPARK-31386 at 4/10/20, 5:32 AM: -------------------------------------------------------------- or could you please try the spark 2.4.5, also no problem could be reproduced [download link|https://www.apache.org/dyn/closer.lua/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz] was (Author: jinxintang): or could you please try the spark 2.4.5, also no problem could be reproduces [download link|https://www.apache.org/dyn/closer.lua/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz] > Reading broadcast in UDF raises MemoryError when > spark.executor.pyspark.memory is set > ------------------------------------------------------------------------------------- > > Key: SPARK-31386 > URL: https://issues.apache.org/jira/browse/SPARK-31386 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.4.4 > Environment: Spark 2.4.4 or AWS EMR > `pyspark --conf spark.executor.pyspark.memory=500m` > Reporter: Viacheslav Krot > Priority: Major > Attachments: 选区_267.png > > > Following code with udf causes MemoryError when > `spark.executor.pyspark.memory` is set > ``` > from pyspark.sql.types import BooleanType > from pyspark.sql.functions import udf > df = spark.createDataFrame([ > ('Alice', 10), > ('Bob', 12) > ], ['name', 'cnt']) > broadcast = spark.sparkContext.broadcast([1,2,3]) > @udf(BooleanType()) > def f(cnt): > return cnt < len(broadcast.value) > df.filter(f(df.cnt)).count() > ``` > Same code work well when spark.executor.pyspark.memory is not set. > The code by itself does not make any sense, just simplest code to reproduce > the bug. > > Error: > ``` > 20/04/08 13:16:50 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 6, > ip-172-31-32-201.us-east-2.compute.internal, executor 2): > org.apache.spark.api.python.PythonException: Traceback (most recent call > last):20/04/08 13:16:50 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID > 6, ip-172-31-32-201.us-east-2.compute.internal, executor 2): > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): File > "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py", > line 377, in main process() File > "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py", > line 372, in process serializer.dump_stream(func(split_index, iterator), > outfile) File > "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py", > line 345, in dump_stream > self.serializer.dump_stream(self._batched(iterator), stream) File > "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py", > line 141, in dump_stream for obj in iterator: File > "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py", > line 334, in _batched for item in iterator: File "<string>", line 1, in > <lambda> File > "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py", > line 85, in <lambda> return lambda *a: f(*a) File > "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/util.py", > line 113, in wrapper return f(*args, **kwargs) File "<stdin>", line 3, > in f File > "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/broadcast.py", > line 148, in value self._value = self.load_from_path(self._path) File > "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/broadcast.py", > line 124, in load_from_path with open(path, 'rb', 1 << 20) as > f:MemoryError > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456) > at > org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81) > at > org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown > Source) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at > org.apache.spark.scheduler.Task.run(Task.scala:123) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > ``` > pyspark command to launch: > `pyspark --conf spark.executor.pyspark.memory=500m` > > emr version - 5.27.0 -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org