[
https://issues.apache.org/jira/browse/SPARK-31386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
JinxinTang updated SPARK-31386:
-------------------------------
Attachment: 选区_001.png
> Reading broadcast in UDF raises MemoryError when
> spark.executor.pyspark.memory is set
> -------------------------------------------------------------------------------------
>
> Key: SPARK-31386
> URL: https://issues.apache.org/jira/browse/SPARK-31386
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.4.4
> Environment: Spark 2.4.4 or AWS EMR
> `pyspark --conf spark.executor.pyspark.memory=500m`
> Reporter: Viacheslav Krot
> Priority: Major
> Attachments: 选区_001.png, 选区_267.png
>
>
> Following code with udf causes MemoryError when
> `spark.executor.pyspark.memory` is set
> ```
> from pyspark.sql.types import BooleanType
> from pyspark.sql.functions import udf
> df = spark.createDataFrame([
> ('Alice', 10),
> ('Bob', 12)
> ], ['name', 'cnt'])
> broadcast = spark.sparkContext.broadcast([1,2,3])
> @udf(BooleanType())
> def f(cnt):
> return cnt < len(broadcast.value)
> df.filter(f(df.cnt)).count()
> ```
> Same code work well when spark.executor.pyspark.memory is not set.
> The code by itself does not make any sense, just simplest code to reproduce
> the bug.
>
> Error:
> ```
> 20/04/08 13:16:50 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 6,
> ip-172-31-32-201.us-east-2.compute.internal, executor 2):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):20/04/08 13:16:50 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID
> 6, ip-172-31-32-201.us-east-2.compute.internal, executor 2):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last): File
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
> line 377, in main process() File
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
> line 372, in process serializer.dump_stream(func(split_index, iterator),
> outfile) File
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
> line 345, in dump_stream
> self.serializer.dump_stream(self._batched(iterator), stream) File
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
> line 141, in dump_stream for obj in iterator: File
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/serializers.py",
> line 334, in _batched for item in iterator: File "<string>", line 1, in
> <lambda> File
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/worker.py",
> line 85, in <lambda> return lambda *a: f(*a) File
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/util.py",
> line 113, in wrapper return f(*args, **kwargs) File "<stdin>", line 3,
> in f File
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/broadcast.py",
> line 148, in value self._value = self.load_from_path(self._path) File
> "/mnt/yarn/usercache/hadoop/appcache/application_1586348201584_0006/container_1586348201584_0006_01_000003/pyspark.zip/pyspark/broadcast.py",
> line 124, in load_from_path with open(path, 'rb', 1 << 20) as
> f:MemoryError
> at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
> at
> org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
> at
> org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
> at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
> Source) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source) at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at
> org.apache.spark.scheduler.Task.run(Task.scala:123) at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> ```
> pyspark command to launch:
> `pyspark --conf spark.executor.pyspark.memory=500m`
>
> emr version - 5.27.0
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]