[ https://issues.apache.org/jira/browse/SPARK-35512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-35512: ------------------------------------ Assignee: Apache Spark > pyspark partitionBy may encounter 'OverflowError: cannot convert float > infinity to integer' > ------------------------------------------------------------------------------------------- > > Key: SPARK-35512 > URL: https://issues.apache.org/jira/browse/SPARK-35512 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.0.2 > Reporter: nolan liu > Assignee: Apache Spark > Priority: Major > > h2. Code sample > {code:python} > # pyspark > rdd = ... > new_rdd = rdd.partitionBy(64){code} > An OverflowError is raised when there is a {color:#ff0000}big input > file{color} and {color:#ff0000}executor memory{color} is not big enough. > h2. Error information: > > {code:java} > TaskSetManager: Lost task 312.0 in stage 1.0 (TID 748, 11.4.137.5, executor > 83): org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File "/opt/spark3/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main > process() > File "/opt/spark3/python/lib/pyspark.zip/pyspark/worker.py", line 597, in > process > serializer.dump_stream(out_iter, outfile) > File "/opt/spark3/python/lib/pyspark.zip/pyspark/serializers.py", line 141, > in dump_stream > for obj in iterator: > File "/opt/spark3/python/lib/pyspark.zip/pyspark/rdd.py", line 1899, in > add_shuffle_key > OverflowError: cannot convert float infinity to integer > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) > at > org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) > at > org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209) > at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:156) > at > org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) > at org.apache.spark.scheduler.Task.run(Task.scala:130) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1420) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > h2. Spark code > [https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L2072] > {code:python} > for k, v in iterator: > buckets[partitionFunc(k) % numPartitions].append((k, v)) > c += 1 # check used memory and avg size of > chunk of objects > if (c % 1000 == 0 and get_used_memory() > limit > or c > batch): > n, size = len(buckets), 0 > for split in list(buckets.keys()): > yield pack_long(split) > d = outputSerializer.dumps(buckets[split]) > del buckets[split] > yield d > size += len(d) avg = int(size / n) > >> 20 > # let 1M < avg < 10M > if avg < 1: > batch *= 1.5 > elif avg > 10: > batch = max(int(batch / 1.5), 1) > c = 0 > {code} > h2. Explanation > *`batch`* may grow infinity when `*get_used_memory() > limit*` is true, then > overflow at `*max(int(batch / 1.5), 1)*` > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org